Skip to main content

forge/
runtime.rs

1//! FORGE - The Rust Full-Stack Framework
2//!
3//! Single binary runtime that provides:
4//! - HTTP Gateway with RPC endpoints
5//! - SSE server for real-time subscriptions
6//! - Background job workers
7//! - Cron scheduler
8//! - Workflow engine
9//! - Cluster coordination
10
11use std::future::Future;
12use std::net::IpAddr;
13use std::path::PathBuf;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::Duration;
17
18use axum::Router;
19use axum::body::Body;
20use axum::http::Request;
21use axum::response::Response;
22use tokio::sync::broadcast;
23
24use forge_core::CircuitBreakerClient;
25use forge_core::cluster::{LeaderRole, NodeId, NodeInfo, NodeRole, NodeStatus};
26use forge_core::config::{ForgeConfig, NodeRole as ConfigNodeRole};
27use forge_core::error::{ForgeError, Result};
28use forge_core::mcp::ForgeMcpTool;
29use forge_runtime::migrations::{Migration, MigrationRunner, load_migrations_from_dir};
30
31use forge_runtime::cluster::{
32    GracefulShutdown, HeartbeatConfig, HeartbeatLoop, LeaderConfig, LeaderElection, NodeRegistry,
33    ShutdownConfig,
34};
35use forge_runtime::cron::{CronRegistry, CronRunner, CronRunnerConfig};
36use forge_runtime::daemon::{DaemonRegistry, DaemonRunner};
37use forge_runtime::db::Database;
38use forge_runtime::function::FunctionRegistry;
39use forge_runtime::gateway::{AuthConfig, GatewayConfig as RuntimeGatewayConfig, GatewayServer};
40use forge_runtime::jobs::{JobDispatcher, JobQueue, JobRegistry, Worker, WorkerConfig};
41use forge_runtime::mcp::McpToolRegistry;
42use forge_runtime::webhook::{WebhookRegistry, WebhookState, webhook_handler};
43use forge_runtime::workflow::{
44    EventStore, WorkflowExecutor, WorkflowRegistry, WorkflowScheduler, WorkflowSchedulerConfig,
45};
46use tokio_util::sync::CancellationToken;
47
48/// Type alias for frontend handler function.
49pub type FrontendHandler = fn(Request<Body>) -> Pin<Box<dyn Future<Output = Response> + Send>>;
50
51/// Prelude module for common imports.
52pub mod prelude {
53    // Common types
54    pub use chrono::{DateTime, Utc};
55    pub use uuid::Uuid;
56
57    // Serde re-exports for user code
58    pub use serde::{Deserialize, Serialize};
59    pub use serde_json;
60
61    /// Timestamp type alias for convenience.
62    pub type Timestamp = DateTime<Utc>;
63
64    // Core types
65    pub use forge_core::cluster::NodeRole;
66    pub use forge_core::config::ForgeConfig;
67    pub use forge_core::cron::{CronContext, ForgeCron};
68    pub use forge_core::daemon::{DaemonContext, ForgeDaemon};
69    pub use forge_core::env::EnvAccess;
70    pub use forge_core::error::{ForgeError, Result};
71    pub use forge_core::function::{
72        AuthContext, ForgeMutation, ForgeQuery, MutationContext, QueryContext,
73    };
74    pub use forge_core::job::{ForgeJob, JobContext, JobPriority};
75    pub use forge_core::mcp::{ForgeMcpTool, McpToolContext, McpToolResult};
76    pub use forge_core::realtime::Delta;
77    pub use forge_core::schema::{FieldDef, ModelMeta, SchemaRegistry, TableDef};
78    pub use forge_core::schemars::JsonSchema;
79    pub use forge_core::types::Upload;
80    pub use forge_core::webhook::{ForgeWebhook, WebhookContext, WebhookResult, WebhookSignature};
81    pub use forge_core::workflow::{ForgeWorkflow, WorkflowContext};
82
83    // Same axum version the runtime uses, avoids type mismatches in custom handlers
84    pub use axum;
85
86    pub use crate::{Forge, ForgeBuilder};
87}
88
89/// The main FORGE runtime.
90pub struct Forge {
91    config: ForgeConfig,
92    db: Option<Database>,
93    node_id: NodeId,
94    function_registry: FunctionRegistry,
95    mcp_registry: McpToolRegistry,
96    job_registry: JobRegistry,
97    cron_registry: Arc<CronRegistry>,
98    workflow_registry: WorkflowRegistry,
99    daemon_registry: Arc<DaemonRegistry>,
100    webhook_registry: Arc<WebhookRegistry>,
101    shutdown_tx: broadcast::Sender<()>,
102    /// Path to user migrations directory (default: ./migrations).
103    migrations_dir: PathBuf,
104    /// Additional migrations provided programmatically.
105    extra_migrations: Vec<Migration>,
106    /// Optional frontend handler for embedded SPA.
107    frontend_handler: Option<FrontendHandler>,
108    /// Custom axum routes merged into the top-level router.
109    custom_routes: Option<Router>,
110}
111
112impl Forge {
113    /// Create a new builder for configuring FORGE.
114    pub fn builder() -> ForgeBuilder {
115        ForgeBuilder::new()
116    }
117
118    /// Get the node ID.
119    pub fn node_id(&self) -> NodeId {
120        self.node_id
121    }
122
123    /// Get the configuration.
124    pub fn config(&self) -> &ForgeConfig {
125        &self.config
126    }
127
128    /// Get the function registry.
129    pub fn function_registry(&self) -> &FunctionRegistry {
130        &self.function_registry
131    }
132
133    /// Get the function registry mutably.
134    pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
135        &mut self.function_registry
136    }
137
138    /// Get the MCP tool registry mutably.
139    pub fn mcp_registry_mut(&mut self) -> &mut McpToolRegistry {
140        &mut self.mcp_registry
141    }
142
143    /// Register an MCP tool without manually accessing the registry.
144    pub fn register_mcp_tool<T: ForgeMcpTool>(&mut self) -> &mut Self {
145        self.mcp_registry.register::<T>();
146        self
147    }
148
149    /// Get the job registry.
150    pub fn job_registry(&self) -> &JobRegistry {
151        &self.job_registry
152    }
153
154    /// Get the job registry mutably.
155    pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
156        &mut self.job_registry
157    }
158
159    /// Get the cron registry.
160    pub fn cron_registry(&self) -> Arc<CronRegistry> {
161        self.cron_registry.clone()
162    }
163
164    /// Get the workflow registry.
165    pub fn workflow_registry(&self) -> &WorkflowRegistry {
166        &self.workflow_registry
167    }
168
169    /// Get the workflow registry mutably.
170    pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
171        &mut self.workflow_registry
172    }
173
174    /// Get the daemon registry.
175    pub fn daemon_registry(&self) -> Arc<DaemonRegistry> {
176        self.daemon_registry.clone()
177    }
178
179    /// Get the webhook registry.
180    pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
181        self.webhook_registry.clone()
182    }
183
184    /// Run the FORGE server.
185    pub async fn run(mut self) -> Result<()> {
186        // Users shouldn't need tracing_subscriber boilerplate to see logs
187        let telemetry_config = forge_runtime::TelemetryConfig::from_observability_config(
188            &self.config.observability,
189            &self.config.project.name,
190            &self.config.project.version,
191        );
192        match forge_runtime::init_telemetry(
193            &telemetry_config,
194            &self.config.project.name,
195            &self.config.observability.log_level,
196        ) {
197            Ok(true) => {}
198            Ok(false) => {
199                // Subscriber already exists, user set one up manually
200            }
201            Err(e) => {
202                eprintln!("forge: failed to initialize telemetry: {e}");
203            }
204        }
205
206        tracing::debug!("Connecting to database");
207
208        // Connect to database
209        let db = Database::from_config(&self.config.database).await?;
210        let pool = db.primary().clone();
211        self.db = Some(db);
212
213        tracing::debug!("Database connected");
214
215        // Run migrations with mesh-safe locking
216        // This acquires an advisory lock, so only one node runs migrations at a time
217        let runner = MigrationRunner::new(pool.clone());
218
219        // Load user migrations from directory + any programmatic ones
220        let mut user_migrations = load_migrations_from_dir(&self.migrations_dir)?;
221        user_migrations.extend(self.extra_migrations.clone());
222
223        runner.run(user_migrations).await?;
224        tracing::debug!("Migrations applied");
225
226        // Get local node info
227        let hostname = hostname::get()
228            .map(|h| h.to_string_lossy().to_string())
229            .unwrap_or_else(|_| "unknown".to_string());
230
231        let ip_address: IpAddr = "127.0.0.1".parse().expect("valid IP literal");
232        let roles: Vec<NodeRole> = self
233            .config
234            .node
235            .roles
236            .iter()
237            .map(config_role_to_node_role)
238            .collect();
239
240        let node_info = NodeInfo::new_local(
241            hostname,
242            ip_address,
243            self.config.gateway.port,
244            self.config.gateway.grpc_port,
245            roles.clone(),
246            self.config.node.worker_capabilities.clone(),
247            env!("CARGO_PKG_VERSION").to_string(),
248        );
249
250        let node_id = node_info.id;
251        self.node_id = node_id;
252
253        // Create node registry
254        let node_registry = Arc::new(NodeRegistry::new(pool.clone(), node_info));
255
256        // Register node in cluster
257        if let Err(e) = node_registry.register().await {
258            tracing::debug!("Failed to register node (tables may not exist): {}", e);
259        }
260
261        // Set node status to active
262        if let Err(e) = node_registry.set_status(NodeStatus::Active).await {
263            tracing::debug!("Failed to set node status: {}", e);
264        }
265
266        // Create leader election for scheduler role
267        let leader_election = if roles.contains(&NodeRole::Scheduler) {
268            let election = Arc::new(LeaderElection::new(
269                pool.clone(),
270                node_id,
271                LeaderRole::Scheduler,
272                LeaderConfig::default(),
273            ));
274
275            // Try to become leader
276            if let Err(e) = election.try_become_leader().await {
277                tracing::debug!("Failed to acquire leadership: {}", e);
278            }
279
280            Some(election)
281        } else {
282            None
283        };
284
285        // Create graceful shutdown coordinator
286        let shutdown = Arc::new(GracefulShutdown::new(
287            node_registry.clone(),
288            leader_election.clone(),
289            ShutdownConfig::default(),
290        ));
291
292        // Create HTTP client with circuit breaker for actions and crons
293        let http_client = CircuitBreakerClient::with_defaults(reqwest::Client::new());
294
295        // Start background tasks based on roles
296        let mut handles = Vec::new();
297
298        // Start heartbeat loop
299        {
300            let heartbeat_pool = pool.clone();
301            let heartbeat_node_id = node_id;
302            let config = HeartbeatConfig::default();
303            handles.push(tokio::spawn(async move {
304                let heartbeat = HeartbeatLoop::new(heartbeat_pool, heartbeat_node_id, config);
305                heartbeat.run().await;
306            }));
307        }
308
309        // Start leader election loop if scheduler role
310        if let Some(ref election) = leader_election {
311            let election = election.clone();
312            handles.push(tokio::spawn(async move {
313                election.run().await;
314            }));
315        }
316
317        // Start job worker if worker role
318        if roles.contains(&NodeRole::Worker) {
319            let job_queue = JobQueue::new(pool.clone());
320            let worker_config = WorkerConfig {
321                id: Some(node_id.as_uuid()),
322                capabilities: self.config.node.worker_capabilities.clone(),
323                max_concurrent: self.config.worker.max_concurrent_jobs,
324                poll_interval: Duration::from_millis(self.config.worker.poll_interval_ms),
325                ..Default::default()
326            };
327
328            let mut worker = Worker::new(
329                worker_config,
330                job_queue,
331                self.job_registry.clone(),
332                pool.clone(),
333            );
334
335            handles.push(tokio::spawn(async move {
336                if let Err(e) = worker.run().await {
337                    tracing::error!("Worker error: {}", e);
338                }
339            }));
340
341            tracing::debug!("Job worker started");
342        }
343
344        // Start cron runner if scheduler role and is leader
345        if roles.contains(&NodeRole::Scheduler) {
346            let cron_registry = self.cron_registry.clone();
347            let cron_pool = pool.clone();
348            let cron_http = http_client.clone();
349            let cron_leader_election = leader_election.clone();
350
351            let cron_config = CronRunnerConfig {
352                poll_interval: Duration::from_secs(1),
353                node_id: node_id.as_uuid(),
354                is_leader: cron_leader_election.is_none(),
355                leader_election: cron_leader_election,
356                run_stale_threshold: Duration::from_secs(15 * 60),
357            };
358
359            let cron_runner = CronRunner::new(cron_registry, cron_pool, cron_http, cron_config);
360
361            handles.push(tokio::spawn(async move {
362                if let Err(e) = cron_runner.run().await {
363                    tracing::error!("Cron runner error: {}", e);
364                }
365            }));
366
367            tracing::debug!("Cron scheduler started");
368        }
369
370        // Start workflow scheduler if scheduler role
371        let workflow_shutdown_token = CancellationToken::new();
372        if roles.contains(&NodeRole::Scheduler) {
373            let scheduler_executor = Arc::new(WorkflowExecutor::new(
374                Arc::new(self.workflow_registry.clone()),
375                pool.clone(),
376                http_client.clone(),
377            ));
378            let event_store = Arc::new(EventStore::new(pool.clone()));
379            let scheduler = WorkflowScheduler::new(
380                pool.clone(),
381                scheduler_executor,
382                event_store,
383                WorkflowSchedulerConfig::default(),
384            );
385
386            let shutdown_token = workflow_shutdown_token.clone();
387            handles.push(tokio::spawn(async move {
388                scheduler.run(shutdown_token).await;
389            }));
390
391            tracing::debug!("Workflow scheduler started");
392        }
393
394        // Start daemon runner if scheduler role (daemons run as singletons)
395        if roles.contains(&NodeRole::Scheduler) && !self.daemon_registry.is_empty() {
396            let daemon_registry = self.daemon_registry.clone();
397            let daemon_pool = pool.clone();
398            let daemon_http = http_client.clone();
399            let daemon_shutdown_rx = self.shutdown_tx.subscribe();
400
401            let daemon_runner = DaemonRunner::new(
402                daemon_registry,
403                daemon_pool,
404                daemon_http,
405                node_id.as_uuid(),
406                daemon_shutdown_rx,
407            );
408
409            handles.push(tokio::spawn(async move {
410                if let Err(e) = daemon_runner.run().await {
411                    tracing::error!("Daemon runner error: {}", e);
412                }
413            }));
414
415            tracing::debug!("Daemon runner started");
416        }
417
418        // Reactor handle for shutdown
419        let mut reactor_handle = None;
420
421        // Create job dispatcher and workflow executor for dispatch capabilities
422        let job_queue = JobQueue::new(pool.clone());
423        let job_dispatcher = Arc::new(JobDispatcher::new(job_queue, self.job_registry.clone()));
424        let workflow_executor = Arc::new(WorkflowExecutor::new(
425            Arc::new(self.workflow_registry.clone()),
426            pool.clone(),
427            http_client.clone(),
428        ));
429
430        // PORT env var overrides config (used by --backend-port in forge dev)
431        let gateway_port = std::env::var("PORT")
432            .ok()
433            .and_then(|p| p.parse::<u16>().ok())
434            .unwrap_or(self.config.gateway.port);
435
436        // Start HTTP gateway if gateway role
437        if roles.contains(&NodeRole::Gateway) {
438            let gateway_config = RuntimeGatewayConfig {
439                port: gateway_port,
440                max_connections: self.config.gateway.max_connections,
441                request_timeout_secs: self.config.gateway.request_timeout_secs,
442                cors_enabled: self.config.gateway.cors_enabled
443                    || !self.config.gateway.cors_origins.is_empty(),
444                cors_origins: self.config.gateway.cors_origins.clone(),
445                auth: AuthConfig::from_forge_config(&self.config.auth)
446                    .map_err(|e| ForgeError::Config(e.to_string()))?,
447                mcp: self.config.mcp.clone(),
448            };
449
450            // Build gateway server (pass Database wrapper for read replica routing)
451            let gateway = GatewayServer::new(
452                gateway_config,
453                self.function_registry.clone(),
454                self.db.clone().expect("Database must be initialized"),
455            )
456            .with_job_dispatcher(job_dispatcher.clone())
457            .with_workflow_dispatcher(workflow_executor.clone())
458            .with_mcp_registry(self.mcp_registry.clone());
459
460            // Start the reactor for real-time updates
461            let reactor = gateway.reactor();
462            if let Err(e) = reactor.start().await {
463                tracing::error!("Failed to start reactor: {}", e);
464            } else {
465                tracing::debug!("Reactor started");
466                reactor_handle = Some(reactor);
467            }
468
469            // Build API router (all under /_api)
470            let api_router = gateway.router();
471
472            // Build final router with API
473            let mut router = Router::new().nest("/_api", api_router);
474
475            // Mount webhook routes under /_api (bypasses gateway auth middleware)
476            if !self.webhook_registry.is_empty() {
477                use axum::routing::post;
478                use tower_http::cors::{Any, CorsLayer};
479
480                let webhook_state = Arc::new(
481                    WebhookState::new(self.webhook_registry.clone(), pool.clone())
482                        .with_job_dispatcher(job_dispatcher.clone()),
483                );
484
485                // Webhook routes need their own CORS layer since they're outside the API router.
486                // Reuse gateway CORS policy rather than forcing wildcard access.
487                let webhook_cors = if self.config.gateway.cors_enabled
488                    || !self.config.gateway.cors_origins.is_empty()
489                {
490                    if self.config.gateway.cors_origins.iter().any(|o| o == "*") {
491                        CorsLayer::new()
492                            .allow_origin(Any)
493                            .allow_methods(Any)
494                            .allow_headers(Any)
495                    } else {
496                        let origins: Vec<_> = self
497                            .config
498                            .gateway
499                            .cors_origins
500                            .iter()
501                            .filter_map(|o| o.parse().ok())
502                            .collect();
503                        CorsLayer::new()
504                            .allow_origin(origins)
505                            .allow_methods(Any)
506                            .allow_headers(Any)
507                    }
508                } else {
509                    CorsLayer::new()
510                };
511
512                let webhook_router = Router::new()
513                    .route("/{*path}", post(webhook_handler).with_state(webhook_state))
514                    .layer(axum::extract::DefaultBodyLimit::max(1024 * 1024))
515                    .layer(
516                        tower::ServiceBuilder::new()
517                            .layer(axum::error_handling::HandleErrorLayer::new(
518                                |err: tower::BoxError| async move {
519                                    if err.is::<tower::timeout::error::Elapsed>() {
520                                        return (
521                                            axum::http::StatusCode::REQUEST_TIMEOUT,
522                                            "Request timed out",
523                                        );
524                                    }
525                                    (
526                                        axum::http::StatusCode::SERVICE_UNAVAILABLE,
527                                        "Server overloaded",
528                                    )
529                                },
530                            ))
531                            .layer(tower::limit::ConcurrencyLimitLayer::new(
532                                self.config.gateway.max_connections,
533                            ))
534                            .layer(tower::timeout::TimeoutLayer::new(Duration::from_secs(
535                                self.config.gateway.request_timeout_secs,
536                            ))),
537                    )
538                    .layer(webhook_cors);
539
540                router = router.nest("/_api/webhooks", webhook_router);
541
542                tracing::debug!(
543                    webhooks = ?self.webhook_registry.paths().collect::<Vec<_>>(),
544                    "Webhook routes registered"
545                );
546            }
547
548            // Merge custom routes before frontend fallback so they take precedence
549            if let Some(custom) = self.custom_routes.take() {
550                router = router.merge(custom);
551                tracing::debug!("Custom routes merged");
552            }
553
554            // Add frontend handler as fallback if configured
555            if let Some(handler) = self.frontend_handler {
556                use axum::routing::get;
557                router = router.fallback(get(handler));
558                tracing::debug!("Frontend handler enabled");
559            }
560
561            let addr = gateway.addr();
562
563            handles.push(tokio::spawn(async move {
564                tracing::debug!(addr = %addr, "Gateway server binding");
565                let listener = tokio::net::TcpListener::bind(addr)
566                    .await
567                    .expect("Failed to bind");
568                if let Err(e) = axum::serve(listener, router).await {
569                    tracing::error!("Gateway server error: {}", e);
570                }
571            }));
572        }
573
574        tracing::info!(
575            node_id = %node_id,
576            roles = ?roles,
577            port = gateway_port,
578            "Forge started"
579        );
580
581        // Wait for shutdown signal
582        let mut shutdown_rx = self.shutdown_tx.subscribe();
583
584        tokio::select! {
585            _ = tokio::signal::ctrl_c() => {
586                tracing::debug!("Received ctrl-c");
587            }
588            _ = shutdown_rx.recv() => {
589                tracing::debug!("Received shutdown notification");
590            }
591        }
592
593        // Graceful shutdown
594        tracing::debug!("Graceful shutdown starting");
595
596        // Stop workflow scheduler
597        workflow_shutdown_token.cancel();
598
599        if let Err(e) = shutdown.shutdown().await {
600            tracing::warn!(error = %e, "Shutdown error");
601        }
602
603        // Stop leader election
604        if let Some(ref election) = leader_election {
605            election.stop();
606        }
607
608        // Stop reactor before closing database
609        if let Some(ref reactor) = reactor_handle {
610            reactor.stop();
611        }
612
613        // Close database connections
614        if let Some(ref db) = self.db {
615            db.close().await;
616        }
617
618        forge_runtime::shutdown_telemetry();
619        tracing::info!("Forge stopped");
620        Ok(())
621    }
622
623    /// Request shutdown.
624    pub fn shutdown(&self) {
625        let _ = self.shutdown_tx.send(());
626    }
627}
628
629/// Builder for configuring the FORGE runtime.
630pub struct ForgeBuilder {
631    config: Option<ForgeConfig>,
632    function_registry: FunctionRegistry,
633    mcp_registry: McpToolRegistry,
634    job_registry: JobRegistry,
635    cron_registry: CronRegistry,
636    workflow_registry: WorkflowRegistry,
637    daemon_registry: DaemonRegistry,
638    webhook_registry: WebhookRegistry,
639    migrations_dir: PathBuf,
640    extra_migrations: Vec<Migration>,
641    frontend_handler: Option<FrontendHandler>,
642    custom_routes: Option<Router>,
643}
644
645impl ForgeBuilder {
646    /// Create a new builder.
647    pub fn new() -> Self {
648        Self {
649            config: None,
650            function_registry: FunctionRegistry::new(),
651            mcp_registry: McpToolRegistry::new(),
652            job_registry: JobRegistry::new(),
653            cron_registry: CronRegistry::new(),
654            workflow_registry: WorkflowRegistry::new(),
655            daemon_registry: DaemonRegistry::new(),
656            webhook_registry: WebhookRegistry::new(),
657            migrations_dir: PathBuf::from("migrations"),
658            extra_migrations: Vec::new(),
659            frontend_handler: None,
660            custom_routes: None,
661        }
662    }
663
664    /// Set the directory to load migrations from.
665    ///
666    /// Defaults to `./migrations`. Migration files should be named like:
667    /// - `0001_create_users.sql`
668    /// - `0002_add_posts.sql`
669    pub fn migrations_dir(mut self, path: impl Into<PathBuf>) -> Self {
670        self.migrations_dir = path.into();
671        self
672    }
673
674    /// Add a migration programmatically.
675    ///
676    /// Use this for migrations that need to be generated at runtime,
677    /// or for testing. For most cases, use migration files instead.
678    pub fn migration(mut self, name: impl Into<String>, sql: impl Into<String>) -> Self {
679        self.extra_migrations.push(Migration::new(name, sql));
680        self
681    }
682
683    /// Set a frontend handler for serving embedded SPA assets.
684    ///
685    /// Use with the `embedded-frontend` feature to build a single binary
686    /// that includes both backend and frontend.
687    pub fn frontend_handler(&mut self, handler: FrontendHandler) {
688        self.frontend_handler = Some(handler);
689    }
690
691    /// Add custom axum routes to the server.
692    ///
693    /// Routes are merged at the top level, outside `/_api`, giving full
694    /// control over headers, extractors, and response types. Avoid paths
695    /// starting with `/_api` as they conflict with internal routes.
696    ///
697    /// ```ignore
698    /// use axum::{Router, routing::get};
699    ///
700    /// let routes = Router::new()
701    ///     .route("/custom/health", get(|| async { "ok" }));
702    ///
703    /// builder.custom_routes(routes);
704    /// ```
705    pub fn custom_routes(&mut self, router: Router) {
706        self.custom_routes = Some(router);
707    }
708
709    /// Set the configuration.
710    pub fn config(mut self, config: ForgeConfig) -> Self {
711        self.config = Some(config);
712        self
713    }
714
715    /// Get mutable access to the function registry.
716    pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
717        &mut self.function_registry
718    }
719
720    /// Get mutable access to the job registry.
721    pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
722        &mut self.job_registry
723    }
724
725    /// Get mutable access to the MCP tool registry.
726    pub fn mcp_registry_mut(&mut self) -> &mut McpToolRegistry {
727        &mut self.mcp_registry
728    }
729
730    /// Register an MCP tool without manually accessing the registry.
731    pub fn register_mcp_tool<T: ForgeMcpTool>(&mut self) -> &mut Self {
732        self.mcp_registry.register::<T>();
733        self
734    }
735
736    /// Get mutable access to the cron registry.
737    pub fn cron_registry_mut(&mut self) -> &mut CronRegistry {
738        &mut self.cron_registry
739    }
740
741    /// Get mutable access to the workflow registry.
742    pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
743        &mut self.workflow_registry
744    }
745
746    /// Get mutable access to the daemon registry.
747    pub fn daemon_registry_mut(&mut self) -> &mut DaemonRegistry {
748        &mut self.daemon_registry
749    }
750
751    /// Get mutable access to the webhook registry.
752    pub fn webhook_registry_mut(&mut self) -> &mut WebhookRegistry {
753        &mut self.webhook_registry
754    }
755
756    /// Build the FORGE runtime.
757    pub fn build(self) -> Result<Forge> {
758        let config = self
759            .config
760            .ok_or_else(|| ForgeError::Config("Configuration is required".to_string()))?;
761
762        let (shutdown_tx, _) = broadcast::channel(1);
763
764        Ok(Forge {
765            config,
766            db: None,
767            node_id: NodeId::new(),
768            function_registry: self.function_registry,
769            mcp_registry: self.mcp_registry,
770            job_registry: self.job_registry,
771            cron_registry: Arc::new(self.cron_registry),
772            workflow_registry: self.workflow_registry,
773            daemon_registry: Arc::new(self.daemon_registry),
774            webhook_registry: Arc::new(self.webhook_registry),
775            shutdown_tx,
776            migrations_dir: self.migrations_dir,
777            extra_migrations: self.extra_migrations,
778            frontend_handler: self.frontend_handler,
779            custom_routes: self.custom_routes,
780        })
781    }
782}
783
784impl Default for ForgeBuilder {
785    fn default() -> Self {
786        Self::new()
787    }
788}
789
790/// Convert config NodeRole to cluster NodeRole.
791fn config_role_to_node_role(role: &ConfigNodeRole) -> NodeRole {
792    match role {
793        ConfigNodeRole::Gateway => NodeRole::Gateway,
794        ConfigNodeRole::Function => NodeRole::Function,
795        ConfigNodeRole::Worker => NodeRole::Worker,
796        ConfigNodeRole::Scheduler => NodeRole::Scheduler,
797    }
798}
799
800#[cfg(test)]
801#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
802mod tests {
803    use super::*;
804    use std::future::Future;
805    use std::pin::Pin;
806
807    use forge_core::mcp::{McpToolAnnotations, McpToolInfo};
808
809    struct TestMcpTool;
810
811    impl ForgeMcpTool for TestMcpTool {
812        type Args = serde_json::Value;
813        type Output = serde_json::Value;
814
815        fn info() -> McpToolInfo {
816            McpToolInfo {
817                name: "test.mcp.tool",
818                title: None,
819                description: None,
820                required_role: None,
821                is_public: false,
822                timeout: None,
823                rate_limit_requests: None,
824                rate_limit_per_secs: None,
825                rate_limit_key: None,
826                annotations: McpToolAnnotations::default(),
827                icons: &[],
828            }
829        }
830
831        fn execute(
832            _ctx: &forge_core::McpToolContext,
833            _args: Self::Args,
834        ) -> Pin<Box<dyn Future<Output = forge_core::Result<Self::Output>> + Send + '_>> {
835            Box::pin(async { Ok(serde_json::json!({ "ok": true })) })
836        }
837    }
838
839    #[test]
840    fn test_forge_builder_new() {
841        let builder = ForgeBuilder::new();
842        assert!(builder.config.is_none());
843    }
844
845    #[test]
846    fn test_forge_builder_requires_config() {
847        let builder = ForgeBuilder::new();
848        let result = builder.build();
849        assert!(result.is_err());
850    }
851
852    #[test]
853    fn test_forge_builder_with_config() {
854        let config = ForgeConfig::default_with_database_url("postgres://localhost/test");
855        let result = ForgeBuilder::new().config(config).build();
856        assert!(result.is_ok());
857    }
858
859    #[test]
860    fn test_forge_builder_register_mcp_tool() {
861        let mut builder = ForgeBuilder::new();
862        builder.register_mcp_tool::<TestMcpTool>();
863        assert_eq!(builder.mcp_registry.len(), 1);
864    }
865
866    #[test]
867    fn test_config_role_conversion() {
868        assert_eq!(
869            config_role_to_node_role(&ConfigNodeRole::Gateway),
870            NodeRole::Gateway
871        );
872        assert_eq!(
873            config_role_to_node_role(&ConfigNodeRole::Worker),
874            NodeRole::Worker
875        );
876        assert_eq!(
877            config_role_to_node_role(&ConfigNodeRole::Scheduler),
878            NodeRole::Scheduler
879        );
880        assert_eq!(
881            config_role_to_node_role(&ConfigNodeRole::Function),
882            NodeRole::Function
883        );
884    }
885}