Skip to main content

forge/
runtime.rs

1//! FORGE - The Rust Full-Stack Framework
2//!
3//! Single binary runtime that provides:
4//! - HTTP Gateway with RPC endpoints
5//! - SSE server for real-time subscriptions
6//! - Background job workers
7//! - Cron scheduler
8//! - Workflow engine
9//! - Cluster coordination
10
11use std::future::Future;
12use std::net::IpAddr;
13use std::path::PathBuf;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::Duration;
17
18use axum::Router;
19use axum::body::Body;
20use axum::http::Request;
21use axum::response::Response;
22use tokio::sync::broadcast;
23
24use forge_core::CircuitBreakerClient;
25use forge_core::cluster::{LeaderRole, NodeId, NodeInfo, NodeRole, NodeStatus};
26use forge_core::config::{ForgeConfig, NodeRole as ConfigNodeRole};
27use forge_core::error::{ForgeError, Result};
28use forge_core::function::{ForgeMutation, ForgeQuery};
29use forge_core::mcp::ForgeMcpTool;
30use forge_runtime::migrations::{Migration, MigrationRunner, load_migrations_from_dir};
31
32use forge_runtime::cluster::{
33    GracefulShutdown, HeartbeatConfig, HeartbeatLoop, LeaderConfig, LeaderElection, NodeRegistry,
34    ShutdownConfig,
35};
36use forge_runtime::cron::{CronRegistry, CronRunner, CronRunnerConfig};
37use forge_runtime::daemon::{DaemonRegistry, DaemonRunner};
38use forge_runtime::db::Database;
39use forge_runtime::function::FunctionRegistry;
40use forge_runtime::gateway::{AuthConfig, GatewayConfig as RuntimeGatewayConfig, GatewayServer};
41use forge_runtime::jobs::{JobDispatcher, JobQueue, JobRegistry, Worker, WorkerConfig};
42use forge_runtime::mcp::McpToolRegistry;
43use forge_runtime::webhook::{WebhookRegistry, WebhookState, webhook_handler};
44use forge_runtime::workflow::{
45    EventStore, WorkflowExecutor, WorkflowRegistry, WorkflowScheduler, WorkflowSchedulerConfig,
46};
47use tokio_util::sync::CancellationToken;
48
49/// Type alias for frontend handler function.
50pub type FrontendHandler = fn(Request<Body>) -> Pin<Box<dyn Future<Output = Response> + Send>>;
51
52/// Prelude module for common imports.
53pub mod prelude {
54    // Common types
55    pub use chrono::{DateTime, Utc};
56    pub use uuid::Uuid;
57
58    // Serde re-exports for user code
59    pub use serde::{Deserialize, Serialize};
60    pub use serde_json;
61
62    /// Timestamp type alias for convenience.
63    pub type Timestamp = DateTime<Utc>;
64
65    // Core types
66    pub use forge_core::cluster::NodeRole;
67    pub use forge_core::config::ForgeConfig;
68    pub use forge_core::cron::{CronContext, ForgeCron};
69    pub use forge_core::daemon::{DaemonContext, ForgeDaemon};
70    pub use forge_core::env::EnvAccess;
71    pub use forge_core::error::{ForgeError, Result};
72    pub use forge_core::function::{
73        AuthContext, ForgeMutation, ForgeQuery, MutationContext, QueryContext,
74    };
75    pub use forge_core::job::{ForgeJob, JobContext, JobPriority};
76    pub use forge_core::mcp::{ForgeMcpTool, McpToolContext, McpToolResult};
77    pub use forge_core::realtime::Delta;
78    pub use forge_core::schema::{FieldDef, ModelMeta, SchemaRegistry, TableDef};
79    pub use forge_core::schemars::JsonSchema;
80    pub use forge_core::types::Upload;
81    pub use forge_core::webhook::{ForgeWebhook, WebhookContext, WebhookResult, WebhookSignature};
82    pub use forge_core::workflow::{ForgeWorkflow, WorkflowContext};
83
84    // Same axum version the runtime uses, avoids type mismatches in custom handlers
85    pub use axum;
86
87    pub use crate::{Forge, ForgeBuilder};
88}
89
90/// The main FORGE runtime.
91pub struct Forge {
92    config: ForgeConfig,
93    db: Option<Database>,
94    node_id: NodeId,
95    function_registry: FunctionRegistry,
96    mcp_registry: McpToolRegistry,
97    job_registry: JobRegistry,
98    cron_registry: Arc<CronRegistry>,
99    workflow_registry: WorkflowRegistry,
100    daemon_registry: Arc<DaemonRegistry>,
101    webhook_registry: Arc<WebhookRegistry>,
102    shutdown_tx: broadcast::Sender<()>,
103    /// Path to user migrations directory (default: ./migrations).
104    migrations_dir: PathBuf,
105    /// Additional migrations provided programmatically.
106    extra_migrations: Vec<Migration>,
107    /// Optional frontend handler for embedded SPA.
108    frontend_handler: Option<FrontendHandler>,
109    /// Custom axum routes merged into the top-level router.
110    custom_routes: Option<Router>,
111}
112
113impl Forge {
114    /// Create a new builder for configuring FORGE.
115    pub fn builder() -> ForgeBuilder {
116        ForgeBuilder::new()
117    }
118
119    /// Get the node ID.
120    pub fn node_id(&self) -> NodeId {
121        self.node_id
122    }
123
124    /// Get the configuration.
125    pub fn config(&self) -> &ForgeConfig {
126        &self.config
127    }
128
129    /// Get the function registry.
130    pub fn function_registry(&self) -> &FunctionRegistry {
131        &self.function_registry
132    }
133
134    /// Get the function registry mutably.
135    pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
136        &mut self.function_registry
137    }
138
139    /// Get the MCP tool registry mutably.
140    pub fn mcp_registry_mut(&mut self) -> &mut McpToolRegistry {
141        &mut self.mcp_registry
142    }
143
144    /// Register an MCP tool without manually accessing the registry.
145    pub fn register_mcp_tool<T: ForgeMcpTool>(&mut self) -> &mut Self {
146        self.mcp_registry.register::<T>();
147        self
148    }
149
150    /// Get the job registry.
151    pub fn job_registry(&self) -> &JobRegistry {
152        &self.job_registry
153    }
154
155    /// Get the job registry mutably.
156    pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
157        &mut self.job_registry
158    }
159
160    /// Get the cron registry.
161    pub fn cron_registry(&self) -> Arc<CronRegistry> {
162        self.cron_registry.clone()
163    }
164
165    /// Get the workflow registry.
166    pub fn workflow_registry(&self) -> &WorkflowRegistry {
167        &self.workflow_registry
168    }
169
170    /// Get the workflow registry mutably.
171    pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
172        &mut self.workflow_registry
173    }
174
175    /// Get the daemon registry.
176    pub fn daemon_registry(&self) -> Arc<DaemonRegistry> {
177        self.daemon_registry.clone()
178    }
179
180    /// Get the webhook registry.
181    pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
182        self.webhook_registry.clone()
183    }
184
185    /// Run the FORGE server.
186    pub async fn run(mut self) -> Result<()> {
187        // Users shouldn't need tracing_subscriber boilerplate to see logs
188        let telemetry_config = forge_runtime::TelemetryConfig::from_observability_config(
189            &self.config.observability,
190            &self.config.project.name,
191            &self.config.project.version,
192        );
193        match forge_runtime::init_telemetry(
194            &telemetry_config,
195            &self.config.project.name,
196            &self.config.observability.log_level,
197        ) {
198            Ok(true) => {}
199            Ok(false) => {
200                // Subscriber already exists, user set one up manually
201            }
202            Err(e) => {
203                eprintln!("forge: failed to initialize telemetry: {e}");
204            }
205        }
206
207        tracing::debug!("Connecting to database");
208
209        // Connect to database
210        let db = Database::from_config(&self.config.database).await?;
211        let pool = db.primary().clone();
212        self.db = Some(db);
213
214        tracing::debug!("Database connected");
215
216        // Run migrations with mesh-safe locking
217        // This acquires an advisory lock, so only one node runs migrations at a time
218        let runner = MigrationRunner::new(pool.clone());
219
220        // Load user migrations from directory + any programmatic ones
221        let mut user_migrations = load_migrations_from_dir(&self.migrations_dir)?;
222        user_migrations.extend(self.extra_migrations.clone());
223
224        runner.run(user_migrations).await?;
225        tracing::debug!("Migrations applied");
226
227        // Get local node info
228        let hostname = hostname::get()
229            .map(|h| h.to_string_lossy().to_string())
230            .unwrap_or_else(|_| "unknown".to_string());
231
232        let ip_address: IpAddr = "127.0.0.1".parse().expect("valid IP literal");
233        let roles: Vec<NodeRole> = self
234            .config
235            .node
236            .roles
237            .iter()
238            .map(config_role_to_node_role)
239            .collect();
240
241        let node_info = NodeInfo::new_local(
242            hostname,
243            ip_address,
244            self.config.gateway.port,
245            self.config.gateway.grpc_port,
246            roles.clone(),
247            self.config.node.worker_capabilities.clone(),
248            env!("CARGO_PKG_VERSION").to_string(),
249        );
250
251        let node_id = node_info.id;
252        self.node_id = node_id;
253
254        // Create node registry
255        let node_registry = Arc::new(NodeRegistry::new(pool.clone(), node_info));
256
257        // Register node in cluster
258        if let Err(e) = node_registry.register().await {
259            tracing::debug!("Failed to register node (tables may not exist): {}", e);
260        }
261
262        // Set node status to active
263        if let Err(e) = node_registry.set_status(NodeStatus::Active).await {
264            tracing::debug!("Failed to set node status: {}", e);
265        }
266
267        // Create leader election for scheduler role
268        let leader_election = if roles.contains(&NodeRole::Scheduler) {
269            let election = Arc::new(LeaderElection::new(
270                pool.clone(),
271                node_id,
272                LeaderRole::Scheduler,
273                LeaderConfig::default(),
274            ));
275
276            // Try to become leader
277            if let Err(e) = election.try_become_leader().await {
278                tracing::debug!("Failed to acquire leadership: {}", e);
279            }
280
281            Some(election)
282        } else {
283            None
284        };
285
286        // Create graceful shutdown coordinator
287        let shutdown = Arc::new(GracefulShutdown::new(
288            node_registry.clone(),
289            leader_election.clone(),
290            ShutdownConfig::default(),
291        ));
292
293        // Create HTTP client with circuit breaker for actions and crons
294        let http_client = CircuitBreakerClient::with_defaults(reqwest::Client::new());
295
296        // Start background tasks based on roles
297        let mut handles = Vec::new();
298
299        // Start heartbeat loop
300        {
301            let heartbeat_pool = pool.clone();
302            let heartbeat_node_id = node_id;
303            let config = HeartbeatConfig::default();
304            handles.push(tokio::spawn(async move {
305                let heartbeat = HeartbeatLoop::new(heartbeat_pool, heartbeat_node_id, config);
306                heartbeat.run().await;
307            }));
308        }
309
310        // Start leader election loop if scheduler role
311        if let Some(ref election) = leader_election {
312            let election = election.clone();
313            handles.push(tokio::spawn(async move {
314                election.run().await;
315            }));
316        }
317
318        // Start job worker if worker role
319        if roles.contains(&NodeRole::Worker) {
320            let job_queue = JobQueue::new(pool.clone());
321            let worker_config = WorkerConfig {
322                id: Some(node_id.as_uuid()),
323                capabilities: self.config.node.worker_capabilities.clone(),
324                max_concurrent: self.config.worker.max_concurrent_jobs,
325                poll_interval: Duration::from_millis(self.config.worker.poll_interval_ms),
326                ..Default::default()
327            };
328
329            let mut worker = Worker::new(
330                worker_config,
331                job_queue,
332                self.job_registry.clone(),
333                pool.clone(),
334            );
335
336            handles.push(tokio::spawn(async move {
337                if let Err(e) = worker.run().await {
338                    tracing::error!("Worker error: {}", e);
339                }
340            }));
341
342            tracing::debug!("Job worker started");
343        }
344
345        // Start cron runner if scheduler role and is leader
346        if roles.contains(&NodeRole::Scheduler) {
347            let cron_registry = self.cron_registry.clone();
348            let cron_pool = pool.clone();
349            let cron_http = http_client.clone();
350            let cron_leader_election = leader_election.clone();
351
352            let cron_config = CronRunnerConfig {
353                poll_interval: Duration::from_secs(1),
354                node_id: node_id.as_uuid(),
355                is_leader: cron_leader_election.is_none(),
356                leader_election: cron_leader_election,
357                run_stale_threshold: Duration::from_secs(15 * 60),
358            };
359
360            let cron_runner = CronRunner::new(cron_registry, cron_pool, cron_http, cron_config);
361
362            handles.push(tokio::spawn(async move {
363                if let Err(e) = cron_runner.run().await {
364                    tracing::error!("Cron runner error: {}", e);
365                }
366            }));
367
368            tracing::debug!("Cron scheduler started");
369        }
370
371        // Start workflow scheduler if scheduler role
372        let workflow_shutdown_token = CancellationToken::new();
373        if roles.contains(&NodeRole::Scheduler) {
374            let scheduler_executor = Arc::new(WorkflowExecutor::new(
375                Arc::new(self.workflow_registry.clone()),
376                pool.clone(),
377                http_client.clone(),
378            ));
379            let event_store = Arc::new(EventStore::new(pool.clone()));
380            let scheduler = WorkflowScheduler::new(
381                pool.clone(),
382                scheduler_executor,
383                event_store,
384                WorkflowSchedulerConfig::default(),
385            );
386
387            let shutdown_token = workflow_shutdown_token.clone();
388            handles.push(tokio::spawn(async move {
389                scheduler.run(shutdown_token).await;
390            }));
391
392            tracing::debug!("Workflow scheduler started");
393        }
394
395        // Create job dispatcher and workflow executor for dispatch capabilities
396        let job_queue_for_dispatch = JobQueue::new(pool.clone());
397        let job_dispatcher = Arc::new(JobDispatcher::new(
398            job_queue_for_dispatch,
399            self.job_registry.clone(),
400        ));
401        let workflow_executor = Arc::new(WorkflowExecutor::new(
402            Arc::new(self.workflow_registry.clone()),
403            pool.clone(),
404            http_client.clone(),
405        ));
406
407        // Start daemon runner if scheduler role (daemons run as singletons)
408        if roles.contains(&NodeRole::Scheduler) && !self.daemon_registry.is_empty() {
409            let daemon_registry = self.daemon_registry.clone();
410            let daemon_pool = pool.clone();
411            let daemon_http = http_client.clone();
412            let daemon_shutdown_rx = self.shutdown_tx.subscribe();
413
414            let daemon_runner = DaemonRunner::new(
415                daemon_registry,
416                daemon_pool,
417                daemon_http,
418                node_id.as_uuid(),
419                daemon_shutdown_rx,
420            )
421            .with_job_dispatch(job_dispatcher.clone())
422            .with_workflow_dispatch(workflow_executor.clone());
423
424            handles.push(tokio::spawn(async move {
425                if let Err(e) = daemon_runner.run().await {
426                    tracing::error!("Daemon runner error: {}", e);
427                }
428            }));
429
430            tracing::debug!("Daemon runner started");
431        }
432
433        // Reactor handle for shutdown
434        let mut reactor_handle = None;
435
436        // PORT env var overrides config (used by --backend-port in forge dev)
437        let gateway_port = std::env::var("PORT")
438            .ok()
439            .and_then(|p| p.parse::<u16>().ok())
440            .unwrap_or(self.config.gateway.port);
441
442        // Start HTTP gateway if gateway role
443        if roles.contains(&NodeRole::Gateway) {
444            let gateway_config = RuntimeGatewayConfig {
445                port: gateway_port,
446                max_connections: self.config.gateway.max_connections,
447                request_timeout_secs: self.config.gateway.request_timeout_secs,
448                cors_enabled: self.config.gateway.cors_enabled
449                    || !self.config.gateway.cors_origins.is_empty(),
450                cors_origins: self.config.gateway.cors_origins.clone(),
451                auth: AuthConfig::from_forge_config(&self.config.auth)
452                    .map_err(|e| ForgeError::Config(e.to_string()))?,
453                mcp: self.config.mcp.clone(),
454            };
455
456            // Build gateway server (pass Database wrapper for read replica routing)
457            let gateway = GatewayServer::new(
458                gateway_config,
459                self.function_registry.clone(),
460                self.db.clone().expect("Database must be initialized"),
461            )
462            .with_job_dispatcher(job_dispatcher.clone())
463            .with_workflow_dispatcher(workflow_executor.clone())
464            .with_mcp_registry(self.mcp_registry.clone());
465
466            // Start the reactor for real-time updates
467            let reactor = gateway.reactor();
468            if let Err(e) = reactor.start().await {
469                tracing::error!("Failed to start reactor: {}", e);
470            } else {
471                tracing::debug!("Reactor started");
472                reactor_handle = Some(reactor);
473            }
474
475            // Build API router (all under /_api)
476            let api_router = gateway.router();
477
478            // Build final router with API
479            let mut router = Router::new().nest("/_api", api_router);
480
481            // Mount webhook routes under /_api (bypasses gateway auth middleware)
482            if !self.webhook_registry.is_empty() {
483                use axum::routing::post;
484                use tower_http::cors::{Any, CorsLayer};
485
486                let webhook_state = Arc::new(
487                    WebhookState::new(self.webhook_registry.clone(), pool.clone())
488                        .with_job_dispatcher(job_dispatcher.clone()),
489                );
490
491                // Webhook routes need their own CORS layer since they're outside the API router.
492                // Reuse gateway CORS policy rather than forcing wildcard access.
493                let webhook_cors = if self.config.gateway.cors_enabled
494                    || !self.config.gateway.cors_origins.is_empty()
495                {
496                    if self.config.gateway.cors_origins.iter().any(|o| o == "*") {
497                        CorsLayer::new()
498                            .allow_origin(Any)
499                            .allow_methods(Any)
500                            .allow_headers(Any)
501                    } else {
502                        let origins: Vec<_> = self
503                            .config
504                            .gateway
505                            .cors_origins
506                            .iter()
507                            .filter_map(|o| o.parse().ok())
508                            .collect();
509                        CorsLayer::new()
510                            .allow_origin(origins)
511                            .allow_methods(Any)
512                            .allow_headers(Any)
513                    }
514                } else {
515                    CorsLayer::new()
516                };
517
518                let webhook_router = Router::new()
519                    .route("/{*path}", post(webhook_handler).with_state(webhook_state))
520                    .layer(axum::extract::DefaultBodyLimit::max(1024 * 1024))
521                    .layer(
522                        tower::ServiceBuilder::new()
523                            .layer(axum::error_handling::HandleErrorLayer::new(
524                                |err: tower::BoxError| async move {
525                                    if err.is::<tower::timeout::error::Elapsed>() {
526                                        return (
527                                            axum::http::StatusCode::REQUEST_TIMEOUT,
528                                            "Request timed out",
529                                        );
530                                    }
531                                    (
532                                        axum::http::StatusCode::SERVICE_UNAVAILABLE,
533                                        "Server overloaded",
534                                    )
535                                },
536                            ))
537                            .layer(tower::limit::ConcurrencyLimitLayer::new(
538                                self.config.gateway.max_connections,
539                            ))
540                            .layer(tower::timeout::TimeoutLayer::new(Duration::from_secs(
541                                self.config.gateway.request_timeout_secs,
542                            ))),
543                    )
544                    .layer(webhook_cors);
545
546                router = router.nest("/_api/webhooks", webhook_router);
547
548                tracing::debug!(
549                    webhooks = ?self.webhook_registry.paths().collect::<Vec<_>>(),
550                    "Webhook routes registered"
551                );
552            }
553
554            // Merge custom routes before frontend fallback so they take precedence
555            if let Some(custom) = self.custom_routes.take() {
556                router = router.merge(custom);
557                tracing::debug!("Custom routes merged");
558            }
559
560            // Add frontend handler as fallback if configured
561            if let Some(handler) = self.frontend_handler {
562                use axum::routing::get;
563                router = router.fallback(get(handler));
564                tracing::debug!("Frontend handler enabled");
565            }
566
567            let addr = gateway.addr();
568
569            handles.push(tokio::spawn(async move {
570                tracing::debug!(addr = %addr, "Gateway server binding");
571                let listener = tokio::net::TcpListener::bind(addr)
572                    .await
573                    .expect("Failed to bind");
574                if let Err(e) = axum::serve(listener, router).await {
575                    tracing::error!("Gateway server error: {}", e);
576                }
577            }));
578        }
579
580        tracing::info!(
581            node_id = %node_id,
582            roles = ?roles,
583            port = gateway_port,
584            "Forge started"
585        );
586
587        // Wait for shutdown signal
588        let mut shutdown_rx = self.shutdown_tx.subscribe();
589
590        tokio::select! {
591            _ = tokio::signal::ctrl_c() => {
592                tracing::debug!("Received ctrl-c");
593            }
594            _ = shutdown_rx.recv() => {
595                tracing::debug!("Received shutdown notification");
596            }
597        }
598
599        // Graceful shutdown
600        tracing::debug!("Graceful shutdown starting");
601
602        // Stop workflow scheduler
603        workflow_shutdown_token.cancel();
604
605        if let Err(e) = shutdown.shutdown().await {
606            tracing::warn!(error = %e, "Shutdown error");
607        }
608
609        // Stop leader election
610        if let Some(ref election) = leader_election {
611            election.stop();
612        }
613
614        // Stop reactor before closing database
615        if let Some(ref reactor) = reactor_handle {
616            reactor.stop();
617        }
618
619        // Close database connections
620        if let Some(ref db) = self.db {
621            db.close().await;
622        }
623
624        forge_runtime::shutdown_telemetry();
625        tracing::info!("Forge stopped");
626        Ok(())
627    }
628
629    /// Request shutdown.
630    pub fn shutdown(&self) {
631        let _ = self.shutdown_tx.send(());
632    }
633}
634
635/// Builder for configuring the FORGE runtime.
636pub struct ForgeBuilder {
637    config: Option<ForgeConfig>,
638    function_registry: FunctionRegistry,
639    mcp_registry: McpToolRegistry,
640    job_registry: JobRegistry,
641    cron_registry: CronRegistry,
642    workflow_registry: WorkflowRegistry,
643    daemon_registry: DaemonRegistry,
644    webhook_registry: WebhookRegistry,
645    migrations_dir: PathBuf,
646    extra_migrations: Vec<Migration>,
647    frontend_handler: Option<FrontendHandler>,
648    custom_routes: Option<Router>,
649}
650
651impl ForgeBuilder {
652    /// Create a new builder.
653    pub fn new() -> Self {
654        Self {
655            config: None,
656            function_registry: FunctionRegistry::new(),
657            mcp_registry: McpToolRegistry::new(),
658            job_registry: JobRegistry::new(),
659            cron_registry: CronRegistry::new(),
660            workflow_registry: WorkflowRegistry::new(),
661            daemon_registry: DaemonRegistry::new(),
662            webhook_registry: WebhookRegistry::new(),
663            migrations_dir: PathBuf::from("migrations"),
664            extra_migrations: Vec::new(),
665            frontend_handler: None,
666            custom_routes: None,
667        }
668    }
669
670    /// Set the directory to load migrations from.
671    ///
672    /// Defaults to `./migrations`. Migration files should be named like:
673    /// - `0001_create_users.sql`
674    /// - `0002_add_posts.sql`
675    pub fn migrations_dir(mut self, path: impl Into<PathBuf>) -> Self {
676        self.migrations_dir = path.into();
677        self
678    }
679
680    /// Add a migration programmatically.
681    ///
682    /// Use this for migrations that need to be generated at runtime,
683    /// or for testing. For most cases, use migration files instead.
684    pub fn migration(mut self, name: impl Into<String>, sql: impl Into<String>) -> Self {
685        self.extra_migrations.push(Migration::new(name, sql));
686        self
687    }
688
689    /// Set a frontend handler for serving embedded SPA assets.
690    ///
691    /// Use with the `embedded-frontend` feature to build a single binary
692    /// that includes both backend and frontend.
693    pub fn frontend_handler(&mut self, handler: FrontendHandler) {
694        self.frontend_handler = Some(handler);
695    }
696
697    /// Add custom axum routes to the server.
698    ///
699    /// Routes are merged at the top level, outside `/_api`, giving full
700    /// control over headers, extractors, and response types. Avoid paths
701    /// starting with `/_api` as they conflict with internal routes.
702    ///
703    /// ```ignore
704    /// use axum::{Router, routing::get};
705    ///
706    /// let routes = Router::new()
707    ///     .route("/custom/health", get(|| async { "ok" }));
708    ///
709    /// builder.custom_routes(routes);
710    /// ```
711    pub fn custom_routes(&mut self, router: Router) {
712        self.custom_routes = Some(router);
713    }
714
715    /// Set the configuration.
716    pub fn config(mut self, config: ForgeConfig) -> Self {
717        self.config = Some(config);
718        self
719    }
720
721    /// Get mutable access to the function registry.
722    pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
723        &mut self.function_registry
724    }
725
726    /// Get mutable access to the job registry.
727    pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
728        &mut self.job_registry
729    }
730
731    /// Get mutable access to the MCP tool registry.
732    pub fn mcp_registry_mut(&mut self) -> &mut McpToolRegistry {
733        &mut self.mcp_registry
734    }
735
736    /// Register an MCP tool without manually accessing the registry.
737    pub fn register_mcp_tool<T: ForgeMcpTool>(&mut self) -> &mut Self {
738        self.mcp_registry.register::<T>();
739        self
740    }
741
742    /// Get mutable access to the cron registry.
743    pub fn cron_registry_mut(&mut self) -> &mut CronRegistry {
744        &mut self.cron_registry
745    }
746
747    /// Get mutable access to the workflow registry.
748    pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
749        &mut self.workflow_registry
750    }
751
752    /// Get mutable access to the daemon registry.
753    pub fn daemon_registry_mut(&mut self) -> &mut DaemonRegistry {
754        &mut self.daemon_registry
755    }
756
757    /// Get mutable access to the webhook registry.
758    pub fn webhook_registry_mut(&mut self) -> &mut WebhookRegistry {
759        &mut self.webhook_registry
760    }
761
762    /// Register a query function.
763    pub fn register_query<Q: ForgeQuery>(&mut self) -> &mut Self
764    where
765        Q::Args: serde::de::DeserializeOwned + Send + 'static,
766        Q::Output: serde::Serialize + Send + 'static,
767    {
768        self.function_registry.register_query::<Q>();
769        self
770    }
771
772    /// Register a mutation function.
773    pub fn register_mutation<M: ForgeMutation>(&mut self) -> &mut Self
774    where
775        M::Args: serde::de::DeserializeOwned + Send + 'static,
776        M::Output: serde::Serialize + Send + 'static,
777    {
778        self.function_registry.register_mutation::<M>();
779        self
780    }
781
782    /// Register a background job.
783    pub fn register_job<J: forge_core::ForgeJob>(&mut self) -> &mut Self
784    where
785        J::Args: serde::de::DeserializeOwned + Send + 'static,
786        J::Output: serde::Serialize + Send + 'static,
787    {
788        self.job_registry.register::<J>();
789        self
790    }
791
792    /// Register a cron handler.
793    pub fn register_cron<C: forge_core::ForgeCron>(&mut self) -> &mut Self {
794        self.cron_registry.register::<C>();
795        self
796    }
797
798    /// Register a workflow.
799    pub fn register_workflow<W: forge_core::ForgeWorkflow>(&mut self) -> &mut Self
800    where
801        W::Input: serde::de::DeserializeOwned,
802        W::Output: serde::Serialize,
803    {
804        self.workflow_registry.register::<W>();
805        self
806    }
807
808    /// Register a daemon.
809    pub fn register_daemon<D: forge_core::ForgeDaemon>(&mut self) -> &mut Self {
810        self.daemon_registry.register::<D>();
811        self
812    }
813
814    /// Register a webhook.
815    pub fn register_webhook<W: forge_core::ForgeWebhook>(&mut self) -> &mut Self {
816        self.webhook_registry.register::<W>();
817        self
818    }
819
820    /// Build the FORGE runtime.
821    pub fn build(self) -> Result<Forge> {
822        let config = self
823            .config
824            .ok_or_else(|| ForgeError::Config("Configuration is required".to_string()))?;
825
826        let (shutdown_tx, _) = broadcast::channel(1);
827
828        Ok(Forge {
829            config,
830            db: None,
831            node_id: NodeId::new(),
832            function_registry: self.function_registry,
833            mcp_registry: self.mcp_registry,
834            job_registry: self.job_registry,
835            cron_registry: Arc::new(self.cron_registry),
836            workflow_registry: self.workflow_registry,
837            daemon_registry: Arc::new(self.daemon_registry),
838            webhook_registry: Arc::new(self.webhook_registry),
839            shutdown_tx,
840            migrations_dir: self.migrations_dir,
841            extra_migrations: self.extra_migrations,
842            frontend_handler: self.frontend_handler,
843            custom_routes: self.custom_routes,
844        })
845    }
846}
847
848impl Default for ForgeBuilder {
849    fn default() -> Self {
850        Self::new()
851    }
852}
853
854/// Convert config NodeRole to cluster NodeRole.
855fn config_role_to_node_role(role: &ConfigNodeRole) -> NodeRole {
856    match role {
857        ConfigNodeRole::Gateway => NodeRole::Gateway,
858        ConfigNodeRole::Function => NodeRole::Function,
859        ConfigNodeRole::Worker => NodeRole::Worker,
860        ConfigNodeRole::Scheduler => NodeRole::Scheduler,
861    }
862}
863
864#[cfg(test)]
865#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
866mod tests {
867    use super::*;
868    use std::future::Future;
869    use std::pin::Pin;
870
871    use forge_core::mcp::{McpToolAnnotations, McpToolInfo};
872
873    struct TestMcpTool;
874
875    impl ForgeMcpTool for TestMcpTool {
876        type Args = serde_json::Value;
877        type Output = serde_json::Value;
878
879        fn info() -> McpToolInfo {
880            McpToolInfo {
881                name: "test.mcp.tool",
882                title: None,
883                description: None,
884                required_role: None,
885                is_public: false,
886                timeout: None,
887                rate_limit_requests: None,
888                rate_limit_per_secs: None,
889                rate_limit_key: None,
890                annotations: McpToolAnnotations::default(),
891                icons: &[],
892            }
893        }
894
895        fn execute(
896            _ctx: &forge_core::McpToolContext,
897            _args: Self::Args,
898        ) -> Pin<Box<dyn Future<Output = forge_core::Result<Self::Output>> + Send + '_>> {
899            Box::pin(async { Ok(serde_json::json!({ "ok": true })) })
900        }
901    }
902
903    #[test]
904    fn test_forge_builder_new() {
905        let builder = ForgeBuilder::new();
906        assert!(builder.config.is_none());
907    }
908
909    #[test]
910    fn test_forge_builder_requires_config() {
911        let builder = ForgeBuilder::new();
912        let result = builder.build();
913        assert!(result.is_err());
914    }
915
916    #[test]
917    fn test_forge_builder_with_config() {
918        let config = ForgeConfig::default_with_database_url("postgres://localhost/test");
919        let result = ForgeBuilder::new().config(config).build();
920        assert!(result.is_ok());
921    }
922
923    #[test]
924    fn test_forge_builder_register_mcp_tool() {
925        let mut builder = ForgeBuilder::new();
926        builder.register_mcp_tool::<TestMcpTool>();
927        assert_eq!(builder.mcp_registry.len(), 1);
928    }
929
930    #[test]
931    fn test_config_role_conversion() {
932        assert_eq!(
933            config_role_to_node_role(&ConfigNodeRole::Gateway),
934            NodeRole::Gateway
935        );
936        assert_eq!(
937            config_role_to_node_role(&ConfigNodeRole::Worker),
938            NodeRole::Worker
939        );
940        assert_eq!(
941            config_role_to_node_role(&ConfigNodeRole::Scheduler),
942            NodeRole::Scheduler
943        );
944        assert_eq!(
945            config_role_to_node_role(&ConfigNodeRole::Function),
946            NodeRole::Function
947        );
948    }
949}