1use std::path::PathBuf;
4use std::sync::Arc;
5
6#[cfg(feature = "gateway")]
7use axum::Router;
8use tokio::sync::broadcast;
9
10use forge_core::cluster::{NodeId, NodeRole};
11use forge_core::config::{ForgeConfig, NodeRole as ConfigNodeRole};
12use forge_core::error::{ForgeError, Result};
13use forge_core::function::{ForgeMutation, ForgeQuery};
14use forge_runtime::pg::migration::Migration;
15
16#[cfg(feature = "gateway")]
17use forge_core::mcp::ForgeMcpTool;
18#[cfg(feature = "cron")]
19use forge_runtime::cron::CronRegistry;
20#[cfg(feature = "daemons")]
21use forge_runtime::daemon::DaemonRegistry;
22use forge_runtime::function::FunctionRegistry;
23#[cfg(feature = "jobs")]
24use forge_runtime::jobs::JobRegistry;
25#[cfg(feature = "gateway")]
26use forge_runtime::mcp::McpToolRegistry;
27#[cfg(feature = "gateway")]
28use forge_runtime::webhook::WebhookRegistry;
29#[cfg(feature = "workflows")]
30use forge_runtime::workflow::WorkflowRegistry;
31
32use super::Forge;
33#[cfg(feature = "gateway")]
34use super::FrontendHandler;
35
36pub struct ForgeBuilder {
38 pub(super) config: Option<ForgeConfig>,
39 pub(super) function_registry: FunctionRegistry,
40 #[cfg(feature = "gateway")]
41 pub(super) role_resolver: Option<forge_core::SharedRoleResolver>,
42 #[cfg(feature = "gateway")]
43 pub(super) mcp_registry: McpToolRegistry,
44 #[cfg(feature = "jobs")]
45 pub(super) job_registry: JobRegistry,
46 #[cfg(feature = "cron")]
47 pub(super) cron_registry: CronRegistry,
48 #[cfg(feature = "workflows")]
49 pub(super) workflow_registry: WorkflowRegistry,
50 #[cfg(feature = "daemons")]
51 pub(super) daemon_registry: DaemonRegistry,
52 #[cfg(feature = "gateway")]
53 pub(super) webhook_registry: WebhookRegistry,
54 pub(super) migrations_dir: PathBuf,
55 pub(super) extra_migrations: Vec<Migration>,
56 #[cfg(feature = "gateway")]
57 pub(super) frontend_handler: Option<FrontendHandler>,
58 #[cfg(feature = "gateway")]
59 pub(super) custom_routes_factory: Option<Box<dyn FnOnce(sqlx::PgPool) -> Router + Send + Sync>>,
60}
61
62impl ForgeBuilder {
63 pub fn new() -> Self {
64 Self {
65 config: None,
66 function_registry: FunctionRegistry::new(),
67 #[cfg(feature = "gateway")]
68 role_resolver: None,
69 #[cfg(feature = "gateway")]
70 mcp_registry: McpToolRegistry::new(),
71 #[cfg(feature = "jobs")]
72 job_registry: JobRegistry::new(),
73 #[cfg(feature = "cron")]
74 cron_registry: CronRegistry::new(),
75 #[cfg(feature = "workflows")]
76 workflow_registry: WorkflowRegistry::new(),
77 #[cfg(feature = "daemons")]
78 daemon_registry: DaemonRegistry::new(),
79 #[cfg(feature = "gateway")]
80 webhook_registry: WebhookRegistry::new(),
81 migrations_dir: PathBuf::from("migrations"),
82 extra_migrations: Vec::new(),
83 #[cfg(feature = "gateway")]
84 frontend_handler: None,
85 #[cfg(feature = "gateway")]
86 custom_routes_factory: None,
87 }
88 }
89
90 pub fn migrations_dir(mut self, path: impl Into<PathBuf>) -> Self {
96 self.migrations_dir = path.into();
97 self
98 }
99
100 pub fn migration(mut self, name: impl Into<String>, sql: impl Into<String>) -> Self {
105 self.extra_migrations.push(Migration::new(name, sql));
106 self
107 }
108
109 #[cfg(feature = "gateway")]
114 pub fn frontend_handler(mut self, handler: FrontendHandler) -> Self {
115 self.frontend_handler = Some(handler);
116 self
117 }
118
119 #[cfg(feature = "gateway")]
128 pub fn with_role_resolver(mut self, resolver: forge_core::SharedRoleResolver) -> Self {
129 self.role_resolver = Some(resolver);
130 self
131 }
132
133 #[cfg(feature = "gateway")]
167 pub fn custom_routes<F>(mut self, f: F) -> Self
168 where
169 F: FnOnce(sqlx::PgPool) -> Router + Send + Sync + 'static,
170 {
171 self.custom_routes_factory = Some(Box::new(f));
172 self
173 }
174
175 pub fn auto_register(mut self) -> Self {
182 let mut registries = crate::auto_register::HandlerRegistries {
183 functions: std::mem::take(&mut self.function_registry),
184 #[cfg(feature = "jobs")]
185 jobs: std::mem::take(&mut self.job_registry),
186 #[cfg(feature = "cron")]
187 crons: std::mem::take(&mut self.cron_registry),
188 #[cfg(feature = "workflows")]
189 workflows: std::mem::take(&mut self.workflow_registry),
190 #[cfg(feature = "daemons")]
191 daemons: std::mem::take(&mut self.daemon_registry),
192 #[cfg(feature = "gateway")]
193 webhooks: std::mem::take(&mut self.webhook_registry),
194 #[cfg(feature = "gateway")]
195 mcp_tools: std::mem::take(&mut self.mcp_registry),
196 };
197 crate::auto_register::auto_register_all(&mut registries);
198 self.function_registry = registries.functions;
199 #[cfg(feature = "jobs")]
200 {
201 self.job_registry = registries.jobs;
202 }
203 #[cfg(feature = "cron")]
204 {
205 self.cron_registry = registries.crons;
206 }
207 #[cfg(feature = "workflows")]
208 {
209 self.workflow_registry = registries.workflows;
210 }
211 #[cfg(feature = "daemons")]
212 {
213 self.daemon_registry = registries.daemons;
214 }
215 #[cfg(feature = "gateway")]
216 {
217 self.webhook_registry = registries.webhooks;
218 self.mcp_registry = registries.mcp_tools;
219 }
220 self
221 }
222
223 pub fn config(mut self, config: ForgeConfig) -> Self {
224 self.config = Some(config);
225 self
226 }
227
228 pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
229 &mut self.function_registry
230 }
231
232 #[cfg(feature = "jobs")]
233 pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
234 &mut self.job_registry
235 }
236
237 #[cfg(feature = "gateway")]
238 pub fn mcp_registry_mut(&mut self) -> &mut McpToolRegistry {
239 &mut self.mcp_registry
240 }
241
242 #[cfg(feature = "gateway")]
243 pub fn register_mcp_tool<T: ForgeMcpTool>(mut self) -> Self {
244 self.mcp_registry.register::<T>();
245 self
246 }
247
248 #[cfg(feature = "cron")]
249 pub fn cron_registry_mut(&mut self) -> &mut CronRegistry {
250 &mut self.cron_registry
251 }
252
253 #[cfg(feature = "workflows")]
254 pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
255 &mut self.workflow_registry
256 }
257
258 #[cfg(feature = "daemons")]
259 pub fn daemon_registry_mut(&mut self) -> &mut DaemonRegistry {
260 &mut self.daemon_registry
261 }
262
263 #[cfg(feature = "gateway")]
264 pub fn webhook_registry_mut(&mut self) -> &mut WebhookRegistry {
265 &mut self.webhook_registry
266 }
267
268 pub fn register_query<Q: ForgeQuery>(mut self) -> Self
269 where
270 Q::Args: serde::de::DeserializeOwned + Send + 'static,
271 Q::Output: serde::Serialize + Send + 'static,
272 {
273 self.function_registry.register_query::<Q>();
274 self
275 }
276
277 pub fn register_mutation<M: ForgeMutation>(mut self) -> Self
278 where
279 M::Args: serde::de::DeserializeOwned + Send + 'static,
280 M::Output: serde::Serialize + Send + 'static,
281 {
282 self.function_registry.register_mutation::<M>();
283 self
284 }
285
286 #[cfg(feature = "jobs")]
287 pub fn register_job<J: forge_core::ForgeJob>(mut self) -> Self
288 where
289 J::Args: serde::de::DeserializeOwned + Send + 'static,
290 J::Output: serde::Serialize + Send + 'static,
291 {
292 self.job_registry.register::<J>();
293 self
294 }
295
296 #[cfg(feature = "cron")]
297 pub fn register_cron<C: forge_core::ForgeCron>(mut self) -> Self {
298 self.cron_registry.register::<C>();
299 self
300 }
301
302 #[cfg(feature = "workflows")]
303 pub fn register_workflow<W: forge_core::ForgeWorkflow>(mut self) -> Self
304 where
305 W::Input: serde::de::DeserializeOwned,
306 W::Output: serde::Serialize,
307 {
308 self.workflow_registry.register::<W>();
309 self
310 }
311
312 #[cfg(feature = "daemons")]
313 pub fn register_daemon<D: forge_core::ForgeDaemon>(mut self) -> Self {
314 self.daemon_registry.register::<D>();
315 self
316 }
317
318 #[cfg(feature = "gateway")]
319 pub fn register_webhook<W: forge_core::ForgeWebhook>(mut self) -> Self {
320 self.webhook_registry.register::<W>();
321 self
322 }
323
324 pub fn build(self) -> Result<Forge> {
325 let config = self
326 .config
327 .ok_or_else(|| ForgeError::config("Configuration is required"))?;
328
329 config.auth.validate()?;
330
331 let (shutdown_tx, _) = broadcast::channel(1);
332
333 #[cfg(feature = "workflows")]
334 let workflow_registry = {
335 let mut reg = self.workflow_registry;
336 reg.signature_check = config.workflow.signature_check;
337 reg
338 };
339
340 Ok(Forge {
341 config,
342 db: None,
343 node_id: NodeId::new(),
344 function_registry: self.function_registry,
345 #[cfg(feature = "gateway")]
346 mcp_registry: self.mcp_registry,
347 #[cfg(feature = "jobs")]
348 job_registry: self.job_registry,
349 #[cfg(feature = "cron")]
350 cron_registry: Arc::new(self.cron_registry),
351 #[cfg(feature = "workflows")]
352 workflow_registry,
353 #[cfg(feature = "daemons")]
354 daemon_registry: Arc::new(self.daemon_registry),
355 #[cfg(feature = "gateway")]
356 webhook_registry: Arc::new(self.webhook_registry),
357 shutdown_tx,
358 migrations_dir: self.migrations_dir,
359 extra_migrations: self.extra_migrations,
360 #[cfg(feature = "gateway")]
361 frontend_handler: self.frontend_handler,
362 #[cfg(feature = "gateway")]
363 custom_routes_factory: self.custom_routes_factory,
364 #[cfg(feature = "gateway")]
365 role_resolver: self.role_resolver,
366 })
367 }
368}
369
370impl Default for ForgeBuilder {
371 fn default() -> Self {
372 Self::new()
373 }
374}
375
376#[cfg(unix)]
377pub(super) fn get_hostname() -> String {
378 nix::unistd::gethostname()
379 .map(|h| h.to_string_lossy().to_string())
380 .unwrap_or_else(|_| "unknown".to_string())
381}
382
383#[cfg(not(unix))]
384pub(super) fn get_hostname() -> String {
385 std::env::var("COMPUTERNAME")
386 .or_else(|_| std::env::var("HOSTNAME"))
387 .unwrap_or_else(|_| "unknown".to_string())
388}
389
390pub(super) fn config_role_to_node_role(role: &ConfigNodeRole) -> NodeRole {
391 match role {
392 ConfigNodeRole::Gateway => NodeRole::Gateway,
393 ConfigNodeRole::Function => NodeRole::Function,
394 ConfigNodeRole::Worker => NodeRole::Worker,
395 ConfigNodeRole::Scheduler => NodeRole::Scheduler,
396 _ => NodeRole::Function,
399 }
400}