Skip to main content

forge/
runtime.rs

1//! FORGE - The Rust Full-Stack Framework
2//!
3//! Single binary runtime that provides:
4//! - HTTP Gateway with RPC endpoints
5//! - SSE server for real-time subscriptions
6//! - Background job workers
7//! - Cron scheduler
8//! - Workflow engine
9//! - Cluster coordination
10
11use std::future::Future;
12use std::net::IpAddr;
13use std::path::PathBuf;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::Duration;
17
18use axum::Router;
19use axum::body::Body;
20use axum::http::Request;
21use axum::response::Response;
22use tokio::sync::broadcast;
23
24use forge_core::CircuitBreakerClient;
25use forge_core::cluster::{LeaderRole, NodeId, NodeInfo, NodeRole, NodeStatus};
26use forge_core::config::{ForgeConfig, NodeRole as ConfigNodeRole};
27use forge_core::error::{ForgeError, Result};
28use forge_core::function::{ForgeMutation, ForgeQuery};
29use forge_core::mcp::ForgeMcpTool;
30use forge_runtime::migrations::{Migration, MigrationRunner, load_migrations_from_dir};
31
32use forge_runtime::cluster::{
33    GracefulShutdown, HeartbeatConfig, HeartbeatLoop, LeaderConfig, LeaderElection, NodeRegistry,
34    ShutdownConfig,
35};
36use forge_runtime::cron::{CronRegistry, CronRunner, CronRunnerConfig};
37use forge_runtime::daemon::{DaemonRegistry, DaemonRunner};
38use forge_runtime::db::Database;
39use forge_runtime::function::FunctionRegistry;
40use forge_runtime::gateway::{AuthConfig, GatewayConfig as RuntimeGatewayConfig, GatewayServer};
41use forge_runtime::jobs::{JobDispatcher, JobQueue, JobRegistry, Worker, WorkerConfig};
42use forge_runtime::mcp::McpToolRegistry;
43use forge_runtime::webhook::{WebhookRegistry, WebhookState, webhook_handler};
44use forge_runtime::workflow::{
45    EventStore, WorkflowExecutor, WorkflowRegistry, WorkflowScheduler, WorkflowSchedulerConfig,
46};
47use tokio_util::sync::CancellationToken;
48
49/// Type alias for frontend handler function.
50pub type FrontendHandler = fn(Request<Body>) -> Pin<Box<dyn Future<Output = Response> + Send>>;
51
52/// Prelude module for common imports.
53pub mod prelude {
54    // Common types
55    pub use chrono::{DateTime, Utc};
56    pub use uuid::Uuid;
57
58    // Serde re-exports for user code
59    pub use serde::{Deserialize, Serialize};
60    pub use serde_json;
61
62    /// Timestamp type alias for convenience.
63    pub type Timestamp = DateTime<Utc>;
64
65    // Core types
66    pub use forge_core::cluster::NodeRole;
67    pub use forge_core::config::ForgeConfig;
68    pub use forge_core::cron::{CronContext, ForgeCron};
69    pub use forge_core::daemon::{DaemonContext, ForgeDaemon};
70    pub use forge_core::env::EnvAccess;
71    pub use forge_core::error::{ForgeError, Result};
72    pub use forge_core::function::{
73        AuthContext, ForgeMutation, ForgeQuery, MutationContext, QueryContext,
74    };
75    pub use forge_core::job::{ForgeJob, JobContext, JobPriority};
76    pub use forge_core::mcp::{ForgeMcpTool, McpToolContext, McpToolResult};
77    pub use forge_core::realtime::Delta;
78    pub use forge_core::schema::{FieldDef, ModelMeta, SchemaRegistry, TableDef};
79    pub use forge_core::schemars::JsonSchema;
80    pub use forge_core::types::Upload;
81    pub use forge_core::webhook::{ForgeWebhook, WebhookContext, WebhookResult, WebhookSignature};
82    pub use forge_core::workflow::{ForgeWorkflow, WorkflowContext};
83
84    // Same axum version the runtime uses, avoids type mismatches in custom handlers
85    pub use axum;
86
87    pub use crate::{Forge, ForgeBuilder};
88}
89
90/// The main FORGE runtime.
91pub struct Forge {
92    config: ForgeConfig,
93    db: Option<Database>,
94    node_id: NodeId,
95    function_registry: FunctionRegistry,
96    mcp_registry: McpToolRegistry,
97    job_registry: JobRegistry,
98    cron_registry: Arc<CronRegistry>,
99    workflow_registry: WorkflowRegistry,
100    daemon_registry: Arc<DaemonRegistry>,
101    webhook_registry: Arc<WebhookRegistry>,
102    shutdown_tx: broadcast::Sender<()>,
103    /// Path to user migrations directory (default: ./migrations).
104    migrations_dir: PathBuf,
105    /// Additional migrations provided programmatically.
106    extra_migrations: Vec<Migration>,
107    /// Optional frontend handler for embedded SPA.
108    frontend_handler: Option<FrontendHandler>,
109    /// Custom axum routes merged into the top-level router.
110    custom_routes: Option<Router>,
111}
112
113impl Forge {
114    /// Create a new builder for configuring FORGE.
115    pub fn builder() -> ForgeBuilder {
116        ForgeBuilder::new()
117    }
118
119    /// Get the node ID.
120    pub fn node_id(&self) -> NodeId {
121        self.node_id
122    }
123
124    /// Get the configuration.
125    pub fn config(&self) -> &ForgeConfig {
126        &self.config
127    }
128
129    /// Get the function registry.
130    pub fn function_registry(&self) -> &FunctionRegistry {
131        &self.function_registry
132    }
133
134    /// Get the function registry mutably.
135    pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
136        &mut self.function_registry
137    }
138
139    /// Get the MCP tool registry mutably.
140    pub fn mcp_registry_mut(&mut self) -> &mut McpToolRegistry {
141        &mut self.mcp_registry
142    }
143
144    /// Register an MCP tool without manually accessing the registry.
145    pub fn register_mcp_tool<T: ForgeMcpTool>(&mut self) -> &mut Self {
146        self.mcp_registry.register::<T>();
147        self
148    }
149
150    /// Get the job registry.
151    pub fn job_registry(&self) -> &JobRegistry {
152        &self.job_registry
153    }
154
155    /// Get the job registry mutably.
156    pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
157        &mut self.job_registry
158    }
159
160    /// Get the cron registry.
161    pub fn cron_registry(&self) -> Arc<CronRegistry> {
162        self.cron_registry.clone()
163    }
164
165    /// Get the workflow registry.
166    pub fn workflow_registry(&self) -> &WorkflowRegistry {
167        &self.workflow_registry
168    }
169
170    /// Get the workflow registry mutably.
171    pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
172        &mut self.workflow_registry
173    }
174
175    /// Get the daemon registry.
176    pub fn daemon_registry(&self) -> Arc<DaemonRegistry> {
177        self.daemon_registry.clone()
178    }
179
180    /// Get the webhook registry.
181    pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
182        self.webhook_registry.clone()
183    }
184
185    /// Run the FORGE server.
186    pub async fn run(mut self) -> Result<()> {
187        // Users shouldn't need tracing_subscriber boilerplate to see logs
188        let telemetry_config = forge_runtime::TelemetryConfig::from_observability_config(
189            &self.config.observability,
190            &self.config.project.name,
191            &self.config.project.version,
192        );
193        match forge_runtime::init_telemetry(
194            &telemetry_config,
195            &self.config.project.name,
196            &self.config.observability.log_level,
197        ) {
198            Ok(true) => {}
199            Ok(false) => {
200                // Subscriber already exists, user set one up manually
201            }
202            Err(e) => {
203                eprintln!("forge: failed to initialize telemetry: {e}");
204            }
205        }
206
207        tracing::debug!("Connecting to database");
208
209        // Connect to database
210        let db =
211            Database::from_config_with_service(&self.config.database, &self.config.project.name)
212                .await?;
213        let pool = db.primary().clone();
214        let jobs_pool = db.jobs_pool().clone();
215        let observability_pool = db.observability_pool().clone();
216        if let Some(handle) = db.start_health_monitor() {
217            let mut shutdown_rx = self.shutdown_tx.subscribe();
218            tokio::spawn(async move {
219                tokio::select! {
220                    _ = shutdown_rx.recv() => {}
221                    _ = handle => {}
222                }
223            });
224        }
225        self.db = Some(db);
226
227        tracing::debug!("Database connected");
228
229        // Run migrations with mesh-safe locking
230        // This acquires an advisory lock, so only one node runs migrations at a time
231        let runner = MigrationRunner::new(pool.clone());
232
233        // Load user migrations from directory + any programmatic ones
234        let mut user_migrations = load_migrations_from_dir(&self.migrations_dir)?;
235        user_migrations.extend(self.extra_migrations.clone());
236
237        runner.run(user_migrations).await?;
238        tracing::debug!("Migrations applied");
239
240        // Get local node info
241        let hostname = get_hostname();
242
243        let ip_address: IpAddr = "127.0.0.1".parse().expect("valid IP literal");
244        let roles: Vec<NodeRole> = self
245            .config
246            .node
247            .roles
248            .iter()
249            .map(config_role_to_node_role)
250            .collect();
251
252        let node_info = NodeInfo::new_local(
253            hostname,
254            ip_address,
255            self.config.gateway.port,
256            self.config.gateway.grpc_port,
257            roles.clone(),
258            self.config.node.worker_capabilities.clone(),
259            env!("CARGO_PKG_VERSION").to_string(),
260        );
261
262        let node_id = node_info.id;
263        self.node_id = node_id;
264
265        // Create node registry
266        let node_registry = Arc::new(NodeRegistry::new(pool.clone(), node_info));
267
268        // Register node in cluster
269        if let Err(e) = node_registry.register().await {
270            tracing::debug!("Failed to register node (tables may not exist): {}", e);
271        }
272
273        // Set node status to active
274        if let Err(e) = node_registry.set_status(NodeStatus::Active).await {
275            tracing::debug!("Failed to set node status: {}", e);
276        }
277
278        // Create leader election for scheduler role
279        let leader_election = if roles.contains(&NodeRole::Scheduler) {
280            let election = Arc::new(LeaderElection::new(
281                pool.clone(),
282                node_id,
283                LeaderRole::Scheduler,
284                LeaderConfig::default(),
285            ));
286
287            // Try to become leader
288            if let Err(e) = election.try_become_leader().await {
289                tracing::debug!("Failed to acquire leadership: {}", e);
290            }
291
292            Some(election)
293        } else {
294            None
295        };
296
297        // Create graceful shutdown coordinator
298        let shutdown = Arc::new(GracefulShutdown::new(
299            node_registry.clone(),
300            leader_election.clone(),
301            ShutdownConfig::default(),
302        ));
303
304        // Create HTTP client with circuit breaker for actions and crons
305        let http_client = CircuitBreakerClient::with_defaults(reqwest::Client::new());
306
307        // Start background tasks based on roles
308        let mut handles = Vec::new();
309
310        // Start heartbeat loop
311        {
312            let heartbeat_pool = pool.clone();
313            let heartbeat_node_id = node_id;
314            let config = HeartbeatConfig::from_cluster_config(&self.config.cluster);
315            handles.push(tokio::spawn(async move {
316                let heartbeat = HeartbeatLoop::new(heartbeat_pool, heartbeat_node_id, config);
317                heartbeat.run().await;
318            }));
319        }
320
321        // Start leader election loop if scheduler role
322        if let Some(ref election) = leader_election {
323            let election = election.clone();
324            handles.push(tokio::spawn(async move {
325                election.run().await;
326            }));
327        }
328
329        // Start job worker if worker role
330        if roles.contains(&NodeRole::Worker) {
331            let job_queue = JobQueue::new(jobs_pool.clone());
332            let worker_config = WorkerConfig {
333                id: Some(node_id.as_uuid()),
334                capabilities: self.config.node.worker_capabilities.clone(),
335                max_concurrent: self.config.worker.max_concurrent_jobs,
336                poll_interval: Duration::from_millis(self.config.worker.poll_interval_ms),
337                ..Default::default()
338            };
339
340            let mut worker = Worker::new(
341                worker_config,
342                job_queue,
343                self.job_registry.clone(),
344                jobs_pool.clone(),
345            );
346
347            handles.push(tokio::spawn(async move {
348                if let Err(e) = worker.run().await {
349                    tracing::error!("Worker error: {}", e);
350                }
351            }));
352
353            tracing::debug!("Job worker started");
354        }
355
356        // Start cron runner if scheduler role and is leader
357        if roles.contains(&NodeRole::Scheduler) {
358            let cron_registry = self.cron_registry.clone();
359            let cron_pool = jobs_pool.clone();
360            let cron_http = http_client.clone();
361            let cron_leader_election = leader_election.clone();
362
363            let cron_config = CronRunnerConfig {
364                poll_interval: Duration::from_secs(1),
365                node_id: node_id.as_uuid(),
366                is_leader: cron_leader_election.is_none(),
367                leader_election: cron_leader_election,
368                run_stale_threshold: Duration::from_secs(15 * 60),
369            };
370
371            let cron_runner = CronRunner::new(cron_registry, cron_pool, cron_http, cron_config);
372
373            handles.push(tokio::spawn(async move {
374                if let Err(e) = cron_runner.run().await {
375                    tracing::error!("Cron runner error: {}", e);
376                }
377            }));
378
379            tracing::debug!("Cron scheduler started");
380        }
381
382        // Start workflow scheduler if scheduler role
383        let workflow_shutdown_token = CancellationToken::new();
384        if roles.contains(&NodeRole::Scheduler) {
385            let scheduler_executor = Arc::new(WorkflowExecutor::new(
386                Arc::new(self.workflow_registry.clone()),
387                jobs_pool.clone(),
388                http_client.clone(),
389            ));
390            let event_store = Arc::new(EventStore::new(jobs_pool.clone()));
391            let scheduler = WorkflowScheduler::new(
392                jobs_pool.clone(),
393                scheduler_executor,
394                event_store,
395                WorkflowSchedulerConfig::default(),
396            );
397
398            let shutdown_token = workflow_shutdown_token.clone();
399            handles.push(tokio::spawn(async move {
400                scheduler.run(shutdown_token).await;
401            }));
402
403            tracing::debug!("Workflow scheduler started");
404        }
405
406        // Create job dispatcher and workflow executor for dispatch capabilities
407        let job_queue_for_dispatch = JobQueue::new(jobs_pool.clone());
408        let job_dispatcher = Arc::new(JobDispatcher::new(
409            job_queue_for_dispatch,
410            self.job_registry.clone(),
411        ));
412        let workflow_executor = Arc::new(WorkflowExecutor::new(
413            Arc::new(self.workflow_registry.clone()),
414            jobs_pool.clone(),
415            http_client.clone(),
416        ));
417
418        // Start daemon runner if scheduler role (daemons run as singletons)
419        if roles.contains(&NodeRole::Scheduler) && !self.daemon_registry.is_empty() {
420            let daemon_registry = self.daemon_registry.clone();
421            let daemon_pool = jobs_pool.clone();
422            let daemon_http = http_client.clone();
423            let daemon_shutdown_rx = self.shutdown_tx.subscribe();
424
425            let daemon_runner = DaemonRunner::new(
426                daemon_registry,
427                daemon_pool,
428                daemon_http,
429                node_id.as_uuid(),
430                daemon_shutdown_rx,
431            )
432            .with_job_dispatch(job_dispatcher.clone())
433            .with_workflow_dispatch(workflow_executor.clone());
434
435            handles.push(tokio::spawn(async move {
436                if let Err(e) = daemon_runner.run().await {
437                    tracing::error!("Daemon runner error: {}", e);
438                }
439            }));
440
441            tracing::debug!("Daemon runner started");
442        }
443
444        // Reactor handle for shutdown
445        let mut reactor_handle = None;
446
447        // Start HTTP gateway if gateway role
448        if roles.contains(&NodeRole::Gateway) {
449            let gateway_config = RuntimeGatewayConfig {
450                port: self.config.gateway.port,
451                max_connections: self.config.gateway.max_connections,
452                sse_max_sessions: self.config.gateway.sse_max_sessions,
453                request_timeout_secs: self.config.gateway.request_timeout_secs,
454                cors_enabled: self.config.gateway.cors_enabled
455                    || !self.config.gateway.cors_origins.is_empty(),
456                cors_origins: self.config.gateway.cors_origins.clone(),
457                auth: AuthConfig::from_forge_config(&self.config.auth)
458                    .map_err(|e| ForgeError::Config(e.to_string()))?,
459                mcp: self.config.mcp.clone(),
460                quiet_routes: self.config.gateway.quiet_routes.clone(),
461            };
462
463            // Build gateway server (pass Database wrapper for read replica routing)
464            let gateway = GatewayServer::new(
465                gateway_config,
466                self.function_registry.clone(),
467                self.db.clone().expect("Database must be initialized"),
468            )
469            .with_job_dispatcher(job_dispatcher.clone())
470            .with_workflow_dispatcher(workflow_executor.clone())
471            .with_mcp_registry(self.mcp_registry.clone());
472
473            // Start the reactor for real-time updates
474            let reactor = gateway.reactor();
475            if let Err(e) = reactor.start().await {
476                tracing::error!("Failed to start reactor: {}", e);
477            } else {
478                tracing::debug!("Reactor started");
479                reactor_handle = Some(reactor);
480            }
481
482            // Build API router (all under /_api)
483            let api_router = gateway.router();
484
485            // Build final router with API
486            let mut router = Router::new().nest("/_api", api_router);
487
488            // Mount webhook routes under /_api (bypasses gateway auth middleware)
489            if !self.webhook_registry.is_empty() {
490                use axum::routing::post;
491                use tower_http::cors::{Any, CorsLayer};
492
493                let webhook_state = Arc::new(
494                    WebhookState::new(self.webhook_registry.clone(), pool.clone())
495                        .with_job_dispatcher(job_dispatcher.clone()),
496                );
497
498                // Webhook routes need their own CORS layer since they're outside the API router.
499                // Reuse gateway CORS policy rather than forcing wildcard access.
500                let webhook_cors = if self.config.gateway.cors_enabled
501                    || !self.config.gateway.cors_origins.is_empty()
502                {
503                    if self.config.gateway.cors_origins.iter().any(|o| o == "*") {
504                        CorsLayer::new()
505                            .allow_origin(Any)
506                            .allow_methods(Any)
507                            .allow_headers(Any)
508                    } else {
509                        let origins: Vec<_> = self
510                            .config
511                            .gateway
512                            .cors_origins
513                            .iter()
514                            .filter_map(|o| o.parse().ok())
515                            .collect();
516                        CorsLayer::new()
517                            .allow_origin(origins)
518                            .allow_methods(Any)
519                            .allow_headers(Any)
520                    }
521                } else {
522                    CorsLayer::new()
523                };
524
525                let webhook_router = Router::new()
526                    .route("/{*path}", post(webhook_handler).with_state(webhook_state))
527                    .layer(axum::extract::DefaultBodyLimit::max(1024 * 1024))
528                    .layer(
529                        tower::ServiceBuilder::new()
530                            .layer(axum::error_handling::HandleErrorLayer::new(
531                                |err: tower::BoxError| async move {
532                                    if err.is::<tower::timeout::error::Elapsed>() {
533                                        return (
534                                            axum::http::StatusCode::REQUEST_TIMEOUT,
535                                            "Request timed out",
536                                        );
537                                    }
538                                    (
539                                        axum::http::StatusCode::SERVICE_UNAVAILABLE,
540                                        "Server overloaded",
541                                    )
542                                },
543                            ))
544                            .layer(tower::limit::ConcurrencyLimitLayer::new(
545                                self.config.gateway.max_connections,
546                            ))
547                            .layer(tower::timeout::TimeoutLayer::new(Duration::from_secs(
548                                self.config.gateway.request_timeout_secs,
549                            ))),
550                    )
551                    .layer(webhook_cors);
552
553                router = router.nest("/_api/webhooks", webhook_router);
554
555                tracing::debug!(
556                    webhooks = ?self.webhook_registry.paths().collect::<Vec<_>>(),
557                    "Webhook routes registered"
558                );
559            }
560
561            // Merge custom routes before frontend fallback so they take precedence
562            if let Some(custom) = self.custom_routes.take() {
563                router = router.merge(custom);
564                tracing::debug!("Custom routes merged");
565            }
566
567            // Add frontend handler as fallback if configured
568            if let Some(handler) = self.frontend_handler {
569                use axum::routing::get;
570                router = router.fallback(get(handler));
571                tracing::debug!("Frontend handler enabled");
572            }
573
574            let addr = gateway.addr();
575
576            handles.push(tokio::spawn(async move {
577                tracing::debug!(addr = %addr, "Gateway server binding");
578                let listener = tokio::net::TcpListener::bind(addr)
579                    .await
580                    .expect("Failed to bind");
581                if let Err(e) = axum::serve(listener, router).await {
582                    tracing::error!("Gateway server error: {}", e);
583                }
584            }));
585        }
586
587        tracing::info!(
588            queries = self.function_registry.queries().count(),
589            mutations = self.function_registry.mutations().count(),
590            jobs = self.job_registry.len(),
591            crons = self.cron_registry.len(),
592            workflows = self.workflow_registry.len(),
593            daemons = self.daemon_registry.len(),
594            webhooks = self.webhook_registry.len(),
595            mcp_tools = self.mcp_registry.len(),
596            "Functions registered"
597        );
598
599        {
600            let metrics_pool = observability_pool;
601            tokio::spawn(async move {
602                loop {
603                    tokio::time::sleep(Duration::from_secs(15)).await;
604                    forge_runtime::observability::record_pool_metrics(&metrics_pool);
605                }
606            });
607        }
608
609        tracing::info!(
610            node_id = %node_id,
611            roles = ?roles,
612            port = self.config.gateway.port,
613            "Forge started"
614        );
615
616        // Wait for shutdown signal
617        let mut shutdown_rx = self.shutdown_tx.subscribe();
618
619        tokio::select! {
620            _ = tokio::signal::ctrl_c() => {
621                tracing::debug!("Received ctrl-c");
622            }
623            _ = shutdown_rx.recv() => {
624                tracing::debug!("Received shutdown notification");
625            }
626        }
627
628        // Graceful shutdown
629        tracing::debug!("Graceful shutdown starting");
630
631        // Stop workflow scheduler
632        workflow_shutdown_token.cancel();
633
634        if let Err(e) = shutdown.shutdown().await {
635            tracing::warn!(error = %e, "Shutdown error");
636        }
637
638        // Stop leader election
639        if let Some(ref election) = leader_election {
640            election.stop();
641        }
642
643        // Stop reactor before closing database
644        if let Some(ref reactor) = reactor_handle {
645            reactor.stop();
646        }
647
648        // Close database connections
649        if let Some(ref db) = self.db {
650            db.close().await;
651        }
652
653        forge_runtime::shutdown_telemetry();
654        tracing::info!("Forge stopped");
655        Ok(())
656    }
657
658    /// Request shutdown.
659    pub fn shutdown(&self) {
660        let _ = self.shutdown_tx.send(());
661    }
662}
663
664/// Builder for configuring the FORGE runtime.
665pub struct ForgeBuilder {
666    config: Option<ForgeConfig>,
667    function_registry: FunctionRegistry,
668    mcp_registry: McpToolRegistry,
669    job_registry: JobRegistry,
670    cron_registry: CronRegistry,
671    workflow_registry: WorkflowRegistry,
672    daemon_registry: DaemonRegistry,
673    webhook_registry: WebhookRegistry,
674    migrations_dir: PathBuf,
675    extra_migrations: Vec<Migration>,
676    frontend_handler: Option<FrontendHandler>,
677    custom_routes: Option<Router>,
678}
679
680impl ForgeBuilder {
681    /// Create a new builder.
682    pub fn new() -> Self {
683        Self {
684            config: None,
685            function_registry: FunctionRegistry::new(),
686            mcp_registry: McpToolRegistry::new(),
687            job_registry: JobRegistry::new(),
688            cron_registry: CronRegistry::new(),
689            workflow_registry: WorkflowRegistry::new(),
690            daemon_registry: DaemonRegistry::new(),
691            webhook_registry: WebhookRegistry::new(),
692            migrations_dir: PathBuf::from("migrations"),
693            extra_migrations: Vec::new(),
694            frontend_handler: None,
695            custom_routes: None,
696        }
697    }
698
699    /// Set the directory to load migrations from.
700    ///
701    /// Defaults to `./migrations`. Migration files should be named like:
702    /// - `0001_create_users.sql`
703    /// - `0002_add_posts.sql`
704    pub fn migrations_dir(mut self, path: impl Into<PathBuf>) -> Self {
705        self.migrations_dir = path.into();
706        self
707    }
708
709    /// Add a migration programmatically.
710    ///
711    /// Use this for migrations that need to be generated at runtime,
712    /// or for testing. For most cases, use migration files instead.
713    pub fn migration(mut self, name: impl Into<String>, sql: impl Into<String>) -> Self {
714        self.extra_migrations.push(Migration::new(name, sql));
715        self
716    }
717
718    /// Set a frontend handler for serving embedded SPA assets.
719    ///
720    /// Use with the `embedded-frontend` feature to build a single binary
721    /// that includes both backend and frontend.
722    pub fn frontend_handler(mut self, handler: FrontendHandler) -> Self {
723        self.frontend_handler = Some(handler);
724        self
725    }
726
727    /// Add custom axum routes to the server.
728    ///
729    /// Routes are merged at the top level, outside `/_api`, giving full
730    /// control over headers, extractors, and response types. Avoid paths
731    /// starting with `/_api` as they conflict with internal routes.
732    ///
733    /// ```ignore
734    /// use axum::{Router, routing::get};
735    ///
736    /// let routes = Router::new()
737    ///     .route("/custom/health", get(|| async { "ok" }));
738    ///
739    /// builder.custom_routes(routes);
740    /// ```
741    pub fn custom_routes(mut self, router: Router) -> Self {
742        self.custom_routes = Some(router);
743        self
744    }
745
746    /// Set the configuration.
747    pub fn config(mut self, config: ForgeConfig) -> Self {
748        self.config = Some(config);
749        self
750    }
751
752    /// Get mutable access to the function registry.
753    pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
754        &mut self.function_registry
755    }
756
757    /// Get mutable access to the job registry.
758    pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
759        &mut self.job_registry
760    }
761
762    /// Get mutable access to the MCP tool registry.
763    pub fn mcp_registry_mut(&mut self) -> &mut McpToolRegistry {
764        &mut self.mcp_registry
765    }
766
767    /// Register an MCP tool without manually accessing the registry.
768    pub fn register_mcp_tool<T: ForgeMcpTool>(mut self) -> Self {
769        self.mcp_registry.register::<T>();
770        self
771    }
772
773    /// Get mutable access to the cron registry.
774    pub fn cron_registry_mut(&mut self) -> &mut CronRegistry {
775        &mut self.cron_registry
776    }
777
778    /// Get mutable access to the workflow registry.
779    pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
780        &mut self.workflow_registry
781    }
782
783    /// Get mutable access to the daemon registry.
784    pub fn daemon_registry_mut(&mut self) -> &mut DaemonRegistry {
785        &mut self.daemon_registry
786    }
787
788    /// Get mutable access to the webhook registry.
789    pub fn webhook_registry_mut(&mut self) -> &mut WebhookRegistry {
790        &mut self.webhook_registry
791    }
792
793    /// Register a query function.
794    pub fn register_query<Q: ForgeQuery>(mut self) -> Self
795    where
796        Q::Args: serde::de::DeserializeOwned + Send + 'static,
797        Q::Output: serde::Serialize + Send + 'static,
798    {
799        self.function_registry.register_query::<Q>();
800        self
801    }
802
803    /// Register a mutation function.
804    pub fn register_mutation<M: ForgeMutation>(mut self) -> Self
805    where
806        M::Args: serde::de::DeserializeOwned + Send + 'static,
807        M::Output: serde::Serialize + Send + 'static,
808    {
809        self.function_registry.register_mutation::<M>();
810        self
811    }
812
813    /// Register a background job.
814    pub fn register_job<J: forge_core::ForgeJob>(mut self) -> Self
815    where
816        J::Args: serde::de::DeserializeOwned + Send + 'static,
817        J::Output: serde::Serialize + Send + 'static,
818    {
819        self.job_registry.register::<J>();
820        self
821    }
822
823    /// Register a cron handler.
824    pub fn register_cron<C: forge_core::ForgeCron>(mut self) -> Self {
825        self.cron_registry.register::<C>();
826        self
827    }
828
829    /// Register a workflow.
830    pub fn register_workflow<W: forge_core::ForgeWorkflow>(mut self) -> Self
831    where
832        W::Input: serde::de::DeserializeOwned,
833        W::Output: serde::Serialize,
834    {
835        self.workflow_registry.register::<W>();
836        self
837    }
838
839    /// Register a daemon.
840    pub fn register_daemon<D: forge_core::ForgeDaemon>(mut self) -> Self {
841        self.daemon_registry.register::<D>();
842        self
843    }
844
845    /// Register a webhook.
846    pub fn register_webhook<W: forge_core::ForgeWebhook>(mut self) -> Self {
847        self.webhook_registry.register::<W>();
848        self
849    }
850
851    /// Build the FORGE runtime.
852    pub fn build(self) -> Result<Forge> {
853        let config = self
854            .config
855            .ok_or_else(|| ForgeError::Config("Configuration is required".to_string()))?;
856
857        let (shutdown_tx, _) = broadcast::channel(1);
858
859        Ok(Forge {
860            config,
861            db: None,
862            node_id: NodeId::new(),
863            function_registry: self.function_registry,
864            mcp_registry: self.mcp_registry,
865            job_registry: self.job_registry,
866            cron_registry: Arc::new(self.cron_registry),
867            workflow_registry: self.workflow_registry,
868            daemon_registry: Arc::new(self.daemon_registry),
869            webhook_registry: Arc::new(self.webhook_registry),
870            shutdown_tx,
871            migrations_dir: self.migrations_dir,
872            extra_migrations: self.extra_migrations,
873            frontend_handler: self.frontend_handler,
874            custom_routes: self.custom_routes,
875        })
876    }
877}
878
879impl Default for ForgeBuilder {
880    fn default() -> Self {
881        Self::new()
882    }
883}
884
885#[cfg(unix)]
886fn get_hostname() -> String {
887    nix::unistd::gethostname()
888        .map(|h| h.to_string_lossy().to_string())
889        .unwrap_or_else(|_| "unknown".to_string())
890}
891
892#[cfg(not(unix))]
893fn get_hostname() -> String {
894    std::env::var("COMPUTERNAME")
895        .or_else(|_| std::env::var("HOSTNAME"))
896        .unwrap_or_else(|_| "unknown".to_string())
897}
898
899/// Convert config NodeRole to cluster NodeRole.
900fn config_role_to_node_role(role: &ConfigNodeRole) -> NodeRole {
901    match role {
902        ConfigNodeRole::Gateway => NodeRole::Gateway,
903        ConfigNodeRole::Function => NodeRole::Function,
904        ConfigNodeRole::Worker => NodeRole::Worker,
905        ConfigNodeRole::Scheduler => NodeRole::Scheduler,
906    }
907}
908
909#[cfg(test)]
910#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
911mod tests {
912    use super::*;
913    use std::future::Future;
914    use std::pin::Pin;
915
916    use forge_core::mcp::{McpToolAnnotations, McpToolInfo};
917
918    struct TestMcpTool;
919
920    impl ForgeMcpTool for TestMcpTool {
921        type Args = serde_json::Value;
922        type Output = serde_json::Value;
923
924        fn info() -> McpToolInfo {
925            McpToolInfo {
926                name: "test.mcp.tool",
927                title: None,
928                description: None,
929                required_role: None,
930                is_public: false,
931                timeout: None,
932                rate_limit_requests: None,
933                rate_limit_per_secs: None,
934                rate_limit_key: None,
935                annotations: McpToolAnnotations::default(),
936                icons: &[],
937            }
938        }
939
940        fn execute(
941            _ctx: &forge_core::McpToolContext,
942            _args: Self::Args,
943        ) -> Pin<Box<dyn Future<Output = forge_core::Result<Self::Output>> + Send + '_>> {
944            Box::pin(async { Ok(serde_json::json!({ "ok": true })) })
945        }
946    }
947
948    #[test]
949    fn test_forge_builder_new() {
950        let builder = ForgeBuilder::new();
951        assert!(builder.config.is_none());
952    }
953
954    #[test]
955    fn test_forge_builder_requires_config() {
956        let builder = ForgeBuilder::new();
957        let result = builder.build();
958        assert!(result.is_err());
959    }
960
961    #[test]
962    fn test_forge_builder_with_config() {
963        let config = ForgeConfig::default_with_database_url("postgres://localhost/test");
964        let result = ForgeBuilder::new().config(config).build();
965        assert!(result.is_ok());
966    }
967
968    #[test]
969    fn test_forge_builder_register_mcp_tool() {
970        let builder = ForgeBuilder::new().register_mcp_tool::<TestMcpTool>();
971        assert_eq!(builder.mcp_registry.len(), 1);
972    }
973
974    #[test]
975    fn test_config_role_conversion() {
976        assert_eq!(
977            config_role_to_node_role(&ConfigNodeRole::Gateway),
978            NodeRole::Gateway
979        );
980        assert_eq!(
981            config_role_to_node_role(&ConfigNodeRole::Worker),
982            NodeRole::Worker
983        );
984        assert_eq!(
985            config_role_to_node_role(&ConfigNodeRole::Scheduler),
986            NodeRole::Scheduler
987        );
988        assert_eq!(
989            config_role_to_node_role(&ConfigNodeRole::Function),
990            NodeRole::Function
991        );
992    }
993}