Skip to main content

forge/runtime/
builder.rs

1//! Builder for configuring the FORGE runtime.
2
3use std::path::PathBuf;
4use std::sync::Arc;
5
6#[cfg(feature = "gateway")]
7use axum::Router;
8use tokio::sync::broadcast;
9
10use forge_core::cluster::{NodeId, NodeRole};
11use forge_core::config::{ForgeConfig, NodeRole as ConfigNodeRole};
12use forge_core::error::{ForgeError, Result};
13use forge_core::function::{ForgeMutation, ForgeQuery};
14use forge_runtime::pg::migration::Migration;
15
16#[cfg(feature = "gateway")]
17use forge_core::mcp::ForgeMcpTool;
18#[cfg(feature = "cron")]
19use forge_runtime::cron::CronRegistry;
20#[cfg(feature = "daemons")]
21use forge_runtime::daemon::DaemonRegistry;
22use forge_runtime::function::FunctionRegistry;
23#[cfg(feature = "jobs")]
24use forge_runtime::jobs::JobRegistry;
25#[cfg(feature = "gateway")]
26use forge_runtime::mcp::McpToolRegistry;
27#[cfg(feature = "gateway")]
28use forge_runtime::webhook::WebhookRegistry;
29#[cfg(feature = "workflows")]
30use forge_runtime::workflow::WorkflowRegistry;
31
32use super::Forge;
33#[cfg(feature = "gateway")]
34use super::FrontendHandler;
35
36/// Builder for configuring the FORGE runtime.
37pub struct ForgeBuilder {
38    pub(super) config: Option<ForgeConfig>,
39    pub(super) function_registry: FunctionRegistry,
40    #[cfg(feature = "gateway")]
41    pub(super) role_resolver: Option<forge_core::SharedRoleResolver>,
42    #[cfg(feature = "gateway")]
43    pub(super) mcp_registry: McpToolRegistry,
44    #[cfg(feature = "jobs")]
45    pub(super) job_registry: JobRegistry,
46    #[cfg(feature = "cron")]
47    pub(super) cron_registry: CronRegistry,
48    #[cfg(feature = "workflows")]
49    pub(super) workflow_registry: WorkflowRegistry,
50    #[cfg(feature = "daemons")]
51    pub(super) daemon_registry: DaemonRegistry,
52    #[cfg(feature = "gateway")]
53    pub(super) webhook_registry: WebhookRegistry,
54    pub(super) migrations_dir: PathBuf,
55    pub(super) extra_migrations: Vec<Migration>,
56    #[cfg(feature = "gateway")]
57    pub(super) frontend_handler: Option<FrontendHandler>,
58    #[cfg(feature = "gateway")]
59    pub(super) custom_routes_factory: Option<Box<dyn FnOnce(sqlx::PgPool) -> Router + Send + Sync>>,
60}
61
62impl ForgeBuilder {
63    pub fn new() -> Self {
64        Self {
65            config: None,
66            function_registry: FunctionRegistry::new(),
67            #[cfg(feature = "gateway")]
68            role_resolver: None,
69            #[cfg(feature = "gateway")]
70            mcp_registry: McpToolRegistry::new(),
71            #[cfg(feature = "jobs")]
72            job_registry: JobRegistry::new(),
73            #[cfg(feature = "cron")]
74            cron_registry: CronRegistry::new(),
75            #[cfg(feature = "workflows")]
76            workflow_registry: WorkflowRegistry::new(),
77            #[cfg(feature = "daemons")]
78            daemon_registry: DaemonRegistry::new(),
79            #[cfg(feature = "gateway")]
80            webhook_registry: WebhookRegistry::new(),
81            migrations_dir: PathBuf::from("migrations"),
82            extra_migrations: Vec::new(),
83            #[cfg(feature = "gateway")]
84            frontend_handler: None,
85            #[cfg(feature = "gateway")]
86            custom_routes_factory: None,
87        }
88    }
89
90    /// Set the directory to load migrations from.
91    ///
92    /// Defaults to `./migrations`. Migration files should be named like:
93    /// - `0001_create_users.sql`
94    /// - `0002_add_posts.sql`
95    pub fn migrations_dir(mut self, path: impl Into<PathBuf>) -> Self {
96        self.migrations_dir = path.into();
97        self
98    }
99
100    /// Add a migration programmatically.
101    ///
102    /// Use this for migrations that need to be generated at runtime,
103    /// or for testing. For most cases, use migration files instead.
104    pub fn migration(mut self, name: impl Into<String>, sql: impl Into<String>) -> Self {
105        self.extra_migrations.push(Migration::new(name, sql));
106        self
107    }
108
109    /// Set a frontend handler for serving embedded SPA assets.
110    ///
111    /// Use with the `embedded-frontend` feature to build a single binary
112    /// that includes both backend and frontend.
113    #[cfg(feature = "gateway")]
114    pub fn frontend_handler(mut self, handler: FrontendHandler) -> Self {
115        self.frontend_handler = Some(handler);
116        self
117    }
118
119    /// Plug in a custom role resolver for RBAC extension.
120    ///
121    /// The default resolver returns the flat `roles` JWT claim as-is. Use
122    /// this to expand roles hierarchically, perform group-membership lookups,
123    /// or consult an external permission service.
124    ///
125    /// The resolver is called for every request that carries a `require_role`
126    /// constraint. Keep it cheap — cache remote lookups internally.
127    #[cfg(feature = "gateway")]
128    pub fn with_role_resolver(mut self, resolver: forge_core::SharedRoleResolver) -> Self {
129        self.role_resolver = Some(resolver);
130        self
131    }
132
133    /// Register custom axum routes built from Forge's managed pool.
134    ///
135    /// The factory runs once during `run()`, after the database pool is
136    /// connected. The returned router is merged into the gateway's `/_api`
137    /// namespace, so every route receives the full middleware stack: auth
138    /// (JWT), CORS, tracing, concurrency limits, and timeouts.
139    ///
140    /// Route paths are relative to `/_api`. Registering `/export/csv`
141    /// exposes `GET /_api/export/csv`. Avoid paths that collide with
142    /// built-ins under `/_api`: `/health`, `/ready`, `/rpc`, `/rpc/*`,
143    /// `/events`, `/subscribe`, `/unsubscribe`, `/subscribe-job`,
144    /// `/subscribe-workflow`, `/signal/*`, `/mcp`, and `/oauth/*`.
145    ///
146    /// The factory receives the framework's `sqlx::PgPool`. Cloning it is
147    /// cheap (`PgPool` is internally an `Arc`).
148    ///
149    /// If your handlers don't need the pool, ignore the argument:
150    ///
151    /// ```ignore
152    /// builder.custom_routes(|_| Router::new().route("/healthz", get(|| async { "ok" })));
153    /// ```
154    ///
155    /// With pool access:
156    ///
157    /// ```ignore
158    /// use axum::{Router, routing::get};
159    ///
160    /// builder.custom_routes(|pool| {
161    ///     Router::new()
162    ///         .route("/export/csv", get(export_handler))
163    ///         .with_state(pool)
164    /// });
165    /// ```
166    #[cfg(feature = "gateway")]
167    pub fn custom_routes<F>(mut self, f: F) -> Self
168    where
169        F: FnOnce(sqlx::PgPool) -> Router + Send + Sync + 'static,
170    {
171        self.custom_routes_factory = Some(Box::new(f));
172        self
173    }
174
175    /// Automatically register all functions discovered via `#[forge::query]`,
176    /// `#[forge::mutation]`, `#[forge::job]`, `#[forge::cron]`, `#[forge::workflow]`,
177    /// `#[forge::daemon]`, `#[forge::webhook]`, and `#[forge::mcp_tool]` macros.
178    ///
179    /// This replaces the need to manually call `.register_query::<T>()` etc.
180    /// for every function in your application.
181    pub fn auto_register(mut self) -> Self {
182        let mut registries = crate::auto_register::HandlerRegistries {
183            functions: std::mem::take(&mut self.function_registry),
184            #[cfg(feature = "jobs")]
185            jobs: std::mem::take(&mut self.job_registry),
186            #[cfg(feature = "cron")]
187            crons: std::mem::take(&mut self.cron_registry),
188            #[cfg(feature = "workflows")]
189            workflows: std::mem::take(&mut self.workflow_registry),
190            #[cfg(feature = "daemons")]
191            daemons: std::mem::take(&mut self.daemon_registry),
192            #[cfg(feature = "gateway")]
193            webhooks: std::mem::take(&mut self.webhook_registry),
194            #[cfg(feature = "gateway")]
195            mcp_tools: std::mem::take(&mut self.mcp_registry),
196        };
197        crate::auto_register::auto_register_all(&mut registries);
198        self.function_registry = registries.functions;
199        #[cfg(feature = "jobs")]
200        {
201            self.job_registry = registries.jobs;
202        }
203        #[cfg(feature = "cron")]
204        {
205            self.cron_registry = registries.crons;
206        }
207        #[cfg(feature = "workflows")]
208        {
209            self.workflow_registry = registries.workflows;
210        }
211        #[cfg(feature = "daemons")]
212        {
213            self.daemon_registry = registries.daemons;
214        }
215        #[cfg(feature = "gateway")]
216        {
217            self.webhook_registry = registries.webhooks;
218            self.mcp_registry = registries.mcp_tools;
219        }
220        self
221    }
222
223    pub fn config(mut self, config: ForgeConfig) -> Self {
224        self.config = Some(config);
225        self
226    }
227
228    pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
229        &mut self.function_registry
230    }
231
232    #[cfg(feature = "jobs")]
233    pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
234        &mut self.job_registry
235    }
236
237    #[cfg(feature = "gateway")]
238    pub fn mcp_registry_mut(&mut self) -> &mut McpToolRegistry {
239        &mut self.mcp_registry
240    }
241
242    #[cfg(feature = "gateway")]
243    pub fn register_mcp_tool<T: ForgeMcpTool>(mut self) -> Self {
244        self.mcp_registry.register::<T>();
245        self
246    }
247
248    #[cfg(feature = "cron")]
249    pub fn cron_registry_mut(&mut self) -> &mut CronRegistry {
250        &mut self.cron_registry
251    }
252
253    #[cfg(feature = "workflows")]
254    pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
255        &mut self.workflow_registry
256    }
257
258    #[cfg(feature = "daemons")]
259    pub fn daemon_registry_mut(&mut self) -> &mut DaemonRegistry {
260        &mut self.daemon_registry
261    }
262
263    #[cfg(feature = "gateway")]
264    pub fn webhook_registry_mut(&mut self) -> &mut WebhookRegistry {
265        &mut self.webhook_registry
266    }
267
268    pub fn register_query<Q: ForgeQuery>(mut self) -> Self
269    where
270        Q::Args: serde::de::DeserializeOwned + Send + 'static,
271        Q::Output: serde::Serialize + Send + 'static,
272    {
273        self.function_registry.register_query::<Q>();
274        self
275    }
276
277    pub fn register_mutation<M: ForgeMutation>(mut self) -> Self
278    where
279        M::Args: serde::de::DeserializeOwned + Send + 'static,
280        M::Output: serde::Serialize + Send + 'static,
281    {
282        self.function_registry.register_mutation::<M>();
283        self
284    }
285
286    #[cfg(feature = "jobs")]
287    pub fn register_job<J: forge_core::ForgeJob>(mut self) -> Self
288    where
289        J::Args: serde::de::DeserializeOwned + Send + 'static,
290        J::Output: serde::Serialize + Send + 'static,
291    {
292        self.job_registry.register::<J>();
293        self
294    }
295
296    #[cfg(feature = "cron")]
297    pub fn register_cron<C: forge_core::ForgeCron>(mut self) -> Self {
298        self.cron_registry.register::<C>();
299        self
300    }
301
302    #[cfg(feature = "workflows")]
303    pub fn register_workflow<W: forge_core::ForgeWorkflow>(mut self) -> Self
304    where
305        W::Input: serde::de::DeserializeOwned,
306        W::Output: serde::Serialize,
307    {
308        self.workflow_registry.register::<W>();
309        self
310    }
311
312    #[cfg(feature = "daemons")]
313    pub fn register_daemon<D: forge_core::ForgeDaemon>(mut self) -> Self {
314        self.daemon_registry.register::<D>();
315        self
316    }
317
318    #[cfg(feature = "gateway")]
319    pub fn register_webhook<W: forge_core::ForgeWebhook>(mut self) -> Self {
320        self.webhook_registry.register::<W>();
321        self
322    }
323
324    pub fn build(self) -> Result<Forge> {
325        let config = self
326            .config
327            .ok_or_else(|| ForgeError::config("Configuration is required"))?;
328
329        config.auth.validate()?;
330
331        let (shutdown_tx, _) = broadcast::channel(1);
332
333        #[cfg(feature = "workflows")]
334        let workflow_registry = {
335            let mut reg = self.workflow_registry;
336            reg.signature_check = config.workflow.signature_check;
337            reg
338        };
339
340        Ok(Forge {
341            config,
342            db: None,
343            node_id: NodeId::new(),
344            function_registry: self.function_registry,
345            #[cfg(feature = "gateway")]
346            mcp_registry: self.mcp_registry,
347            #[cfg(feature = "jobs")]
348            job_registry: self.job_registry,
349            #[cfg(feature = "cron")]
350            cron_registry: Arc::new(self.cron_registry),
351            #[cfg(feature = "workflows")]
352            workflow_registry,
353            #[cfg(feature = "daemons")]
354            daemon_registry: Arc::new(self.daemon_registry),
355            #[cfg(feature = "gateway")]
356            webhook_registry: Arc::new(self.webhook_registry),
357            shutdown_tx,
358            migrations_dir: self.migrations_dir,
359            extra_migrations: self.extra_migrations,
360            #[cfg(feature = "gateway")]
361            frontend_handler: self.frontend_handler,
362            #[cfg(feature = "gateway")]
363            custom_routes_factory: self.custom_routes_factory,
364            #[cfg(feature = "gateway")]
365            role_resolver: self.role_resolver,
366        })
367    }
368}
369
370impl Default for ForgeBuilder {
371    fn default() -> Self {
372        Self::new()
373    }
374}
375
376#[cfg(unix)]
377pub(super) fn get_hostname() -> String {
378    nix::unistd::gethostname()
379        .map(|h| h.to_string_lossy().to_string())
380        .unwrap_or_else(|_| "unknown".to_string())
381}
382
383#[cfg(not(unix))]
384pub(super) fn get_hostname() -> String {
385    std::env::var("COMPUTERNAME")
386        .or_else(|_| std::env::var("HOSTNAME"))
387        .unwrap_or_else(|_| "unknown".to_string())
388}
389
390pub(super) fn config_role_to_node_role(role: &ConfigNodeRole) -> NodeRole {
391    match role {
392        ConfigNodeRole::Gateway => NodeRole::Gateway,
393        ConfigNodeRole::Function => NodeRole::Function,
394        ConfigNodeRole::Worker => NodeRole::Worker,
395        ConfigNodeRole::Scheduler => NodeRole::Scheduler,
396        // ConfigNodeRole is #[non_exhaustive]; default unknown future roles to
397        // Function so the node can still serve RPCs while the runtime catches up.
398        _ => NodeRole::Function,
399    }
400}