1use std::future::Future;
12use std::net::IpAddr;
13use std::path::PathBuf;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::Duration;
17
18use axum::Router;
19use axum::body::Body;
20use axum::http::Request;
21use axum::response::Response;
22use tokio::sync::broadcast;
23
24use forge_core::CircuitBreakerClient;
25use forge_core::cluster::{LeaderRole, NodeId, NodeInfo, NodeRole, NodeStatus};
26use forge_core::config::{ForgeConfig, NodeRole as ConfigNodeRole};
27use forge_core::error::{ForgeError, Result};
28use forge_core::function::{ForgeMutation, ForgeQuery};
29use forge_core::mcp::ForgeMcpTool;
30use forge_runtime::migrations::{Migration, MigrationRunner, load_migrations_from_dir};
31
32use forge_runtime::cluster::{
33 GracefulShutdown, HeartbeatConfig, HeartbeatLoop, LeaderConfig, LeaderElection, NodeRegistry,
34 ShutdownConfig,
35};
36use forge_runtime::cron::{CronRegistry, CronRunner, CronRunnerConfig};
37use forge_runtime::daemon::{DaemonRegistry, DaemonRunner};
38use forge_runtime::db::Database;
39use forge_runtime::function::FunctionRegistry;
40use forge_runtime::gateway::{AuthConfig, GatewayConfig as RuntimeGatewayConfig, GatewayServer};
41use forge_runtime::jobs::{JobDispatcher, JobQueue, JobRegistry, Worker, WorkerConfig};
42use forge_runtime::mcp::McpToolRegistry;
43use forge_runtime::webhook::{WebhookRegistry, WebhookState, webhook_handler};
44use forge_runtime::workflow::{
45 EventStore, WorkflowExecutor, WorkflowRegistry, WorkflowScheduler, WorkflowSchedulerConfig,
46};
47use tokio_util::sync::CancellationToken;
48
49pub type FrontendHandler = fn(Request<Body>) -> Pin<Box<dyn Future<Output = Response> + Send>>;
51
52pub mod prelude {
54 pub use chrono::{DateTime, Utc};
56 pub use uuid::Uuid;
57
58 pub use serde::{Deserialize, Serialize};
60 pub use serde_json;
61
62 pub type Timestamp = DateTime<Utc>;
64
65 pub use forge_core::auth::TokenPair;
67 pub use forge_core::cluster::NodeRole;
68 pub use forge_core::config::ForgeConfig;
69 pub use forge_core::cron::{CronContext, ForgeCron};
70 pub use forge_core::daemon::{DaemonContext, ForgeDaemon};
71 pub use forge_core::env::EnvAccess;
72 pub use forge_core::error::{ForgeError, Result};
73 pub use forge_core::function::{
74 AuthContext, ForgeMutation, ForgeQuery, MutationContext, QueryContext,
75 };
76 pub use forge_core::job::{ForgeJob, JobContext, JobPriority};
77 pub use forge_core::mcp::{ForgeMcpTool, McpToolContext, McpToolResult};
78 pub use forge_core::realtime::Delta;
79 pub use forge_core::schema::{FieldDef, ModelMeta, SchemaRegistry, TableDef};
80 pub use forge_core::schemars::JsonSchema;
81 pub use forge_core::types::Upload;
82 pub use forge_core::webhook::{ForgeWebhook, WebhookContext, WebhookResult, WebhookSignature};
83 pub use forge_core::workflow::{ForgeWorkflow, WorkflowContext};
84
85 pub use axum;
87
88 pub use crate::{Forge, ForgeBuilder};
89}
90
91pub struct Forge {
93 config: ForgeConfig,
94 db: Option<Database>,
95 node_id: NodeId,
96 function_registry: FunctionRegistry,
97 mcp_registry: McpToolRegistry,
98 job_registry: JobRegistry,
99 cron_registry: Arc<CronRegistry>,
100 workflow_registry: WorkflowRegistry,
101 daemon_registry: Arc<DaemonRegistry>,
102 webhook_registry: Arc<WebhookRegistry>,
103 shutdown_tx: broadcast::Sender<()>,
104 migrations_dir: PathBuf,
106 extra_migrations: Vec<Migration>,
108 frontend_handler: Option<FrontendHandler>,
110 custom_routes: Option<Router>,
112}
113
114impl Forge {
115 pub fn builder() -> ForgeBuilder {
117 ForgeBuilder::new()
118 }
119
120 pub fn node_id(&self) -> NodeId {
122 self.node_id
123 }
124
125 pub fn config(&self) -> &ForgeConfig {
127 &self.config
128 }
129
130 pub fn function_registry(&self) -> &FunctionRegistry {
132 &self.function_registry
133 }
134
135 pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
137 &mut self.function_registry
138 }
139
140 pub fn mcp_registry_mut(&mut self) -> &mut McpToolRegistry {
142 &mut self.mcp_registry
143 }
144
145 pub fn register_mcp_tool<T: ForgeMcpTool>(&mut self) -> &mut Self {
147 self.mcp_registry.register::<T>();
148 self
149 }
150
151 pub fn job_registry(&self) -> &JobRegistry {
153 &self.job_registry
154 }
155
156 pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
158 &mut self.job_registry
159 }
160
161 pub fn cron_registry(&self) -> Arc<CronRegistry> {
163 self.cron_registry.clone()
164 }
165
166 pub fn workflow_registry(&self) -> &WorkflowRegistry {
168 &self.workflow_registry
169 }
170
171 pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
173 &mut self.workflow_registry
174 }
175
176 pub fn daemon_registry(&self) -> Arc<DaemonRegistry> {
178 self.daemon_registry.clone()
179 }
180
181 pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
183 self.webhook_registry.clone()
184 }
185
186 pub async fn run(mut self) -> Result<()> {
188 let telemetry_config = forge_runtime::TelemetryConfig::from_observability_config(
190 &self.config.observability,
191 &self.config.project.name,
192 &self.config.project.version,
193 );
194 match forge_runtime::init_telemetry(
195 &telemetry_config,
196 &self.config.project.name,
197 &self.config.observability.log_level,
198 ) {
199 Ok(true) => {}
200 Ok(false) => {
201 }
203 Err(e) => {
204 eprintln!("forge: failed to initialize telemetry: {e}");
205 }
206 }
207
208 tracing::debug!("Connecting to database");
209
210 let db =
212 Database::from_config_with_service(&self.config.database, &self.config.project.name)
213 .await?;
214 let pool = db.primary().clone();
215 let jobs_pool = db.jobs_pool().clone();
216 let observability_pool = db.observability_pool().clone();
217 if let Some(handle) = db.start_health_monitor() {
218 let mut shutdown_rx = self.shutdown_tx.subscribe();
219 tokio::spawn(async move {
220 tokio::select! {
221 _ = shutdown_rx.recv() => {}
222 _ = handle => {}
223 }
224 });
225 }
226 self.db = Some(db);
227
228 tracing::debug!("Database connected");
229
230 let runner = MigrationRunner::new(pool.clone());
233
234 let mut user_migrations = load_migrations_from_dir(&self.migrations_dir)?;
236 user_migrations.extend(self.extra_migrations.clone());
237
238 runner.run(user_migrations).await?;
239 tracing::debug!("Migrations applied");
240
241 let hostname = get_hostname();
243
244 let ip_address: IpAddr = "127.0.0.1".parse().expect("valid IP literal");
245 let roles: Vec<NodeRole> = self
246 .config
247 .node
248 .roles
249 .iter()
250 .map(config_role_to_node_role)
251 .collect();
252
253 let node_info = NodeInfo::new_local(
254 hostname,
255 ip_address,
256 self.config.gateway.port,
257 self.config.gateway.grpc_port,
258 roles.clone(),
259 self.config.node.worker_capabilities.clone(),
260 env!("CARGO_PKG_VERSION").to_string(),
261 );
262
263 let node_id = node_info.id;
264 self.node_id = node_id;
265
266 let node_registry = Arc::new(NodeRegistry::new(pool.clone(), node_info));
268
269 if let Err(e) = node_registry.register().await {
271 tracing::debug!("Failed to register node (tables may not exist): {}", e);
272 }
273
274 if let Err(e) = node_registry.set_status(NodeStatus::Active).await {
276 tracing::debug!("Failed to set node status: {}", e);
277 }
278
279 let leader_election = if roles.contains(&NodeRole::Scheduler) {
281 let election = Arc::new(LeaderElection::new(
282 pool.clone(),
283 node_id,
284 LeaderRole::Scheduler,
285 LeaderConfig::default(),
286 ));
287
288 if let Err(e) = election.try_become_leader().await {
290 tracing::debug!("Failed to acquire leadership: {}", e);
291 }
292
293 Some(election)
294 } else {
295 None
296 };
297
298 let shutdown = Arc::new(GracefulShutdown::new(
300 node_registry.clone(),
301 leader_election.clone(),
302 ShutdownConfig::default(),
303 ));
304
305 let http_client = CircuitBreakerClient::with_defaults(reqwest::Client::new());
307
308 let mut handles = Vec::new();
310
311 {
313 let heartbeat_pool = pool.clone();
314 let heartbeat_node_id = node_id;
315 let config = HeartbeatConfig::from_cluster_config(&self.config.cluster);
316 handles.push(tokio::spawn(async move {
317 let heartbeat = HeartbeatLoop::new(heartbeat_pool, heartbeat_node_id, config);
318 heartbeat.run().await;
319 }));
320 }
321
322 if let Some(ref election) = leader_election {
324 let election = election.clone();
325 handles.push(tokio::spawn(async move {
326 election.run().await;
327 }));
328 }
329
330 if roles.contains(&NodeRole::Worker) {
332 let job_queue = JobQueue::new(jobs_pool.clone());
333 let worker_config = WorkerConfig {
334 id: Some(node_id.as_uuid()),
335 capabilities: self.config.node.worker_capabilities.clone(),
336 max_concurrent: self.config.worker.max_concurrent_jobs,
337 poll_interval: Duration::from_millis(self.config.worker.poll_interval_ms),
338 ..Default::default()
339 };
340
341 let mut worker = Worker::new(
342 worker_config,
343 job_queue,
344 self.job_registry.clone(),
345 jobs_pool.clone(),
346 );
347
348 handles.push(tokio::spawn(async move {
349 if let Err(e) = worker.run().await {
350 tracing::error!("Worker error: {}", e);
351 }
352 }));
353
354 tracing::debug!("Job worker started");
355 }
356
357 if roles.contains(&NodeRole::Scheduler) {
359 let cron_registry = self.cron_registry.clone();
360 let cron_pool = jobs_pool.clone();
361 let cron_http = http_client.clone();
362 let cron_leader_election = leader_election.clone();
363
364 let cron_config = CronRunnerConfig {
365 poll_interval: Duration::from_secs(1),
366 node_id: node_id.as_uuid(),
367 is_leader: cron_leader_election.is_none(),
368 leader_election: cron_leader_election,
369 run_stale_threshold: Duration::from_secs(15 * 60),
370 };
371
372 let cron_runner = CronRunner::new(cron_registry, cron_pool, cron_http, cron_config);
373
374 handles.push(tokio::spawn(async move {
375 if let Err(e) = cron_runner.run().await {
376 tracing::error!("Cron runner error: {}", e);
377 }
378 }));
379
380 tracing::debug!("Cron scheduler started");
381 }
382
383 let workflow_shutdown_token = CancellationToken::new();
385 if roles.contains(&NodeRole::Scheduler) {
386 let scheduler_executor = Arc::new(WorkflowExecutor::new(
387 Arc::new(self.workflow_registry.clone()),
388 jobs_pool.clone(),
389 http_client.clone(),
390 ));
391 let event_store = Arc::new(EventStore::new(jobs_pool.clone()));
392 let scheduler = WorkflowScheduler::new(
393 jobs_pool.clone(),
394 scheduler_executor,
395 event_store,
396 WorkflowSchedulerConfig::default(),
397 );
398
399 let shutdown_token = workflow_shutdown_token.clone();
400 handles.push(tokio::spawn(async move {
401 scheduler.run(shutdown_token).await;
402 }));
403
404 tracing::debug!("Workflow scheduler started");
405 }
406
407 let job_queue_for_dispatch = JobQueue::new(jobs_pool.clone());
409 let job_dispatcher = Arc::new(JobDispatcher::new(
410 job_queue_for_dispatch,
411 self.job_registry.clone(),
412 ));
413 let workflow_executor = Arc::new(WorkflowExecutor::new(
414 Arc::new(self.workflow_registry.clone()),
415 jobs_pool.clone(),
416 http_client.clone(),
417 ));
418
419 if roles.contains(&NodeRole::Scheduler) && !self.daemon_registry.is_empty() {
421 let daemon_registry = self.daemon_registry.clone();
422 let daemon_pool = jobs_pool.clone();
423 let daemon_http = http_client.clone();
424 let daemon_shutdown_rx = self.shutdown_tx.subscribe();
425
426 let daemon_runner = DaemonRunner::new(
427 daemon_registry,
428 daemon_pool,
429 daemon_http,
430 node_id.as_uuid(),
431 daemon_shutdown_rx,
432 )
433 .with_job_dispatch(job_dispatcher.clone())
434 .with_workflow_dispatch(workflow_executor.clone());
435
436 handles.push(tokio::spawn(async move {
437 if let Err(e) = daemon_runner.run().await {
438 tracing::error!("Daemon runner error: {}", e);
439 }
440 }));
441
442 tracing::debug!("Daemon runner started");
443 }
444
445 let mut reactor_handle = None;
447
448 if roles.contains(&NodeRole::Gateway) {
450 let gateway_config = RuntimeGatewayConfig {
451 port: self.config.gateway.port,
452 max_connections: self.config.gateway.max_connections,
453 sse_max_sessions: self.config.gateway.sse_max_sessions,
454 request_timeout_secs: self.config.gateway.request_timeout_secs,
455 cors_enabled: self.config.gateway.cors_enabled
456 || !self.config.gateway.cors_origins.is_empty(),
457 cors_origins: self.config.gateway.cors_origins.clone(),
458 auth: AuthConfig::from_forge_config(&self.config.auth)
459 .map_err(|e| ForgeError::Config(e.to_string()))?,
460 mcp: self.config.mcp.clone(),
461 quiet_routes: self.config.gateway.quiet_routes.clone(),
462 token_ttl: forge_core::AuthTokenTtl {
463 access_token_secs: self.config.auth.access_token_ttl_secs(),
464 refresh_token_days: self.config.auth.refresh_token_ttl_days(),
465 },
466 project_name: self.config.project.name.clone(),
467 };
468
469 let gateway = GatewayServer::new(
471 gateway_config,
472 self.function_registry.clone(),
473 self.db
474 .clone()
475 .ok_or_else(|| ForgeError::Internal("Database not initialized".into()))?,
476 )
477 .with_job_dispatcher(job_dispatcher.clone())
478 .with_workflow_dispatcher(workflow_executor.clone())
479 .with_mcp_registry(self.mcp_registry.clone());
480
481 let reactor = gateway.reactor();
483 if let Err(e) = reactor.start().await {
484 tracing::error!("Failed to start reactor: {}", e);
485 } else {
486 tracing::debug!("Reactor started");
487 reactor_handle = Some(reactor);
488 }
489
490 let api_router = gateway.router();
492
493 let mut router = Router::new().nest("/_api", api_router);
495
496 if !self.webhook_registry.is_empty() {
498 use axum::routing::post;
499 use tower_http::cors::{Any, CorsLayer};
500
501 let webhook_state = Arc::new(
502 WebhookState::new(self.webhook_registry.clone(), pool.clone())
503 .with_job_dispatcher(job_dispatcher.clone()),
504 );
505
506 let webhook_cors = if self.config.gateway.cors_enabled
509 || !self.config.gateway.cors_origins.is_empty()
510 {
511 if self.config.gateway.cors_origins.iter().any(|o| o == "*") {
512 CorsLayer::new()
513 .allow_origin(Any)
514 .allow_methods(Any)
515 .allow_headers(Any)
516 } else {
517 let origins: Vec<_> = self
518 .config
519 .gateway
520 .cors_origins
521 .iter()
522 .filter_map(|o| o.parse().ok())
523 .collect();
524 CorsLayer::new()
525 .allow_origin(origins)
526 .allow_methods(Any)
527 .allow_headers(Any)
528 }
529 } else {
530 CorsLayer::new()
531 };
532
533 let webhook_router = Router::new()
534 .route("/{*path}", post(webhook_handler).with_state(webhook_state))
535 .layer(axum::extract::DefaultBodyLimit::max(1024 * 1024))
536 .layer(
537 tower::ServiceBuilder::new()
538 .layer(axum::error_handling::HandleErrorLayer::new(
539 |err: tower::BoxError| async move {
540 if err.is::<tower::timeout::error::Elapsed>() {
541 return (
542 axum::http::StatusCode::REQUEST_TIMEOUT,
543 "Request timed out",
544 );
545 }
546 (
547 axum::http::StatusCode::SERVICE_UNAVAILABLE,
548 "Server overloaded",
549 )
550 },
551 ))
552 .layer(tower::limit::ConcurrencyLimitLayer::new(
553 self.config.gateway.max_connections,
554 ))
555 .layer(tower::timeout::TimeoutLayer::new(Duration::from_secs(
556 self.config.gateway.request_timeout_secs,
557 ))),
558 )
559 .layer(webhook_cors);
560
561 router = router.nest("/_api/webhooks", webhook_router);
562
563 tracing::debug!(
564 webhooks = ?self.webhook_registry.paths().collect::<Vec<_>>(),
565 "Webhook routes registered"
566 );
567 }
568
569 if self.config.mcp.enabled {
571 use axum::routing::get;
572
573 if let Some((oauth_api_router, oauth_state)) = gateway.oauth_router() {
574 router = router.nest("/_api", oauth_api_router);
576
577 router = router
579 .route(
580 "/.well-known/oauth-authorization-server",
581 get(forge_runtime::gateway::oauth::well_known_oauth_metadata)
582 .with_state(oauth_state.clone()),
583 )
584 .route(
585 "/.well-known/oauth-protected-resource",
586 get(forge_runtime::gateway::oauth::well_known_resource_metadata)
587 .with_state(oauth_state),
588 );
589
590 tracing::info!("OAuth 2.1 endpoints enabled for MCP");
591 } else {
592 async fn oauth_not_supported() -> impl axum::response::IntoResponse {
594 (
595 axum::http::StatusCode::NOT_FOUND,
596 axum::Json(serde_json::json!({
597 "error": "oauth_not_supported",
598 "error_description": "This server does not support OAuth. Connect without authentication."
599 })),
600 )
601 }
602 router = router
603 .route(
604 "/.well-known/oauth-authorization-server",
605 get(oauth_not_supported),
606 )
607 .route(
608 "/.well-known/oauth-protected-resource",
609 get(oauth_not_supported),
610 );
611 }
612 }
613
614 if let Some(custom) = self.custom_routes.take() {
616 router = router.merge(custom);
617 tracing::debug!("Custom routes merged");
618 }
619
620 if let Some(handler) = self.frontend_handler {
622 use axum::routing::get;
623 router = router.fallback(get(handler));
624 tracing::debug!("Frontend handler enabled");
625 }
626
627 let addr = gateway.addr();
628
629 handles.push(tokio::spawn(async move {
630 tracing::debug!(addr = %addr, "Gateway server binding");
631 let listener = tokio::net::TcpListener::bind(addr)
632 .await
633 .expect("Failed to bind");
634 if let Err(e) = axum::serve(listener, router).await {
635 tracing::error!("Gateway server error: {}", e);
636 }
637 }));
638 }
639
640 tracing::info!(
641 queries = self.function_registry.queries().count(),
642 mutations = self.function_registry.mutations().count(),
643 jobs = self.job_registry.len(),
644 crons = self.cron_registry.len(),
645 workflows = self.workflow_registry.len(),
646 daemons = self.daemon_registry.len(),
647 webhooks = self.webhook_registry.len(),
648 mcp_tools = self.mcp_registry.len(),
649 "Functions registered"
650 );
651
652 {
653 let metrics_pool = observability_pool;
654 tokio::spawn(async move {
655 loop {
656 tokio::time::sleep(Duration::from_secs(15)).await;
657 forge_runtime::observability::record_pool_metrics(&metrics_pool);
658 }
659 });
660 }
661
662 tracing::info!(
663 node_id = %node_id,
664 roles = ?roles,
665 port = self.config.gateway.port,
666 "Forge started"
667 );
668
669 let mut shutdown_rx = self.shutdown_tx.subscribe();
671
672 tokio::select! {
673 _ = tokio::signal::ctrl_c() => {
674 tracing::debug!("Received ctrl-c");
675 }
676 _ = shutdown_rx.recv() => {
677 tracing::debug!("Received shutdown notification");
678 }
679 }
680
681 tracing::debug!("Graceful shutdown starting");
683
684 workflow_shutdown_token.cancel();
686
687 if let Err(e) = shutdown.shutdown().await {
688 tracing::warn!(error = %e, "Shutdown error");
689 }
690
691 if let Some(ref election) = leader_election {
693 election.stop();
694 }
695
696 if let Some(ref reactor) = reactor_handle {
698 reactor.stop();
699 }
700
701 if let Some(ref db) = self.db {
703 db.close().await;
704 }
705
706 forge_runtime::shutdown_telemetry();
707 tracing::info!("Forge stopped");
708 Ok(())
709 }
710
711 pub fn shutdown(&self) {
713 let _ = self.shutdown_tx.send(());
714 }
715}
716
717pub struct ForgeBuilder {
719 config: Option<ForgeConfig>,
720 function_registry: FunctionRegistry,
721 mcp_registry: McpToolRegistry,
722 job_registry: JobRegistry,
723 cron_registry: CronRegistry,
724 workflow_registry: WorkflowRegistry,
725 daemon_registry: DaemonRegistry,
726 webhook_registry: WebhookRegistry,
727 migrations_dir: PathBuf,
728 extra_migrations: Vec<Migration>,
729 frontend_handler: Option<FrontendHandler>,
730 custom_routes: Option<Router>,
731}
732
733impl ForgeBuilder {
734 pub fn new() -> Self {
736 Self {
737 config: None,
738 function_registry: FunctionRegistry::new(),
739 mcp_registry: McpToolRegistry::new(),
740 job_registry: JobRegistry::new(),
741 cron_registry: CronRegistry::new(),
742 workflow_registry: WorkflowRegistry::new(),
743 daemon_registry: DaemonRegistry::new(),
744 webhook_registry: WebhookRegistry::new(),
745 migrations_dir: PathBuf::from("migrations"),
746 extra_migrations: Vec::new(),
747 frontend_handler: None,
748 custom_routes: None,
749 }
750 }
751
752 pub fn migrations_dir(mut self, path: impl Into<PathBuf>) -> Self {
758 self.migrations_dir = path.into();
759 self
760 }
761
762 pub fn migration(mut self, name: impl Into<String>, sql: impl Into<String>) -> Self {
767 self.extra_migrations.push(Migration::new(name, sql));
768 self
769 }
770
771 pub fn frontend_handler(mut self, handler: FrontendHandler) -> Self {
776 self.frontend_handler = Some(handler);
777 self
778 }
779
780 pub fn custom_routes(mut self, router: Router) -> Self {
795 self.custom_routes = Some(router);
796 self
797 }
798
799 pub fn auto_register(mut self) -> Self {
806 crate::auto_register::auto_register_all(
807 &mut self.function_registry,
808 &mut self.job_registry,
809 &mut self.cron_registry,
810 &mut self.workflow_registry,
811 &mut self.daemon_registry,
812 &mut self.webhook_registry,
813 &mut self.mcp_registry,
814 );
815 self
816 }
817
818 pub fn config(mut self, config: ForgeConfig) -> Self {
820 self.config = Some(config);
821 self
822 }
823
824 pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
826 &mut self.function_registry
827 }
828
829 pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
831 &mut self.job_registry
832 }
833
834 pub fn mcp_registry_mut(&mut self) -> &mut McpToolRegistry {
836 &mut self.mcp_registry
837 }
838
839 pub fn register_mcp_tool<T: ForgeMcpTool>(mut self) -> Self {
841 self.mcp_registry.register::<T>();
842 self
843 }
844
845 pub fn cron_registry_mut(&mut self) -> &mut CronRegistry {
847 &mut self.cron_registry
848 }
849
850 pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
852 &mut self.workflow_registry
853 }
854
855 pub fn daemon_registry_mut(&mut self) -> &mut DaemonRegistry {
857 &mut self.daemon_registry
858 }
859
860 pub fn webhook_registry_mut(&mut self) -> &mut WebhookRegistry {
862 &mut self.webhook_registry
863 }
864
865 pub fn register_query<Q: ForgeQuery>(mut self) -> Self
867 where
868 Q::Args: serde::de::DeserializeOwned + Send + 'static,
869 Q::Output: serde::Serialize + Send + 'static,
870 {
871 self.function_registry.register_query::<Q>();
872 self
873 }
874
875 pub fn register_mutation<M: ForgeMutation>(mut self) -> Self
877 where
878 M::Args: serde::de::DeserializeOwned + Send + 'static,
879 M::Output: serde::Serialize + Send + 'static,
880 {
881 self.function_registry.register_mutation::<M>();
882 self
883 }
884
885 pub fn register_job<J: forge_core::ForgeJob>(mut self) -> Self
887 where
888 J::Args: serde::de::DeserializeOwned + Send + 'static,
889 J::Output: serde::Serialize + Send + 'static,
890 {
891 self.job_registry.register::<J>();
892 self
893 }
894
895 pub fn register_cron<C: forge_core::ForgeCron>(mut self) -> Self {
897 self.cron_registry.register::<C>();
898 self
899 }
900
901 pub fn register_workflow<W: forge_core::ForgeWorkflow>(mut self) -> Self
903 where
904 W::Input: serde::de::DeserializeOwned,
905 W::Output: serde::Serialize,
906 {
907 self.workflow_registry.register::<W>();
908 self
909 }
910
911 pub fn register_daemon<D: forge_core::ForgeDaemon>(mut self) -> Self {
913 self.daemon_registry.register::<D>();
914 self
915 }
916
917 pub fn register_webhook<W: forge_core::ForgeWebhook>(mut self) -> Self {
919 self.webhook_registry.register::<W>();
920 self
921 }
922
923 pub fn build(self) -> Result<Forge> {
925 let config = self
926 .config
927 .ok_or_else(|| ForgeError::Config("Configuration is required".to_string()))?;
928
929 let (shutdown_tx, _) = broadcast::channel(1);
930
931 Ok(Forge {
932 config,
933 db: None,
934 node_id: NodeId::new(),
935 function_registry: self.function_registry,
936 mcp_registry: self.mcp_registry,
937 job_registry: self.job_registry,
938 cron_registry: Arc::new(self.cron_registry),
939 workflow_registry: self.workflow_registry,
940 daemon_registry: Arc::new(self.daemon_registry),
941 webhook_registry: Arc::new(self.webhook_registry),
942 shutdown_tx,
943 migrations_dir: self.migrations_dir,
944 extra_migrations: self.extra_migrations,
945 frontend_handler: self.frontend_handler,
946 custom_routes: self.custom_routes,
947 })
948 }
949}
950
951impl Default for ForgeBuilder {
952 fn default() -> Self {
953 Self::new()
954 }
955}
956
957#[cfg(unix)]
958fn get_hostname() -> String {
959 nix::unistd::gethostname()
960 .map(|h| h.to_string_lossy().to_string())
961 .unwrap_or_else(|_| "unknown".to_string())
962}
963
964#[cfg(not(unix))]
965fn get_hostname() -> String {
966 std::env::var("COMPUTERNAME")
967 .or_else(|_| std::env::var("HOSTNAME"))
968 .unwrap_or_else(|_| "unknown".to_string())
969}
970
971fn config_role_to_node_role(role: &ConfigNodeRole) -> NodeRole {
973 match role {
974 ConfigNodeRole::Gateway => NodeRole::Gateway,
975 ConfigNodeRole::Function => NodeRole::Function,
976 ConfigNodeRole::Worker => NodeRole::Worker,
977 ConfigNodeRole::Scheduler => NodeRole::Scheduler,
978 }
979}
980
981#[cfg(test)]
982#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
983mod tests {
984 use super::*;
985 use std::future::Future;
986 use std::pin::Pin;
987
988 use forge_core::mcp::{McpToolAnnotations, McpToolInfo};
989
990 struct TestMcpTool;
991
992 impl ForgeMcpTool for TestMcpTool {
993 type Args = serde_json::Value;
994 type Output = serde_json::Value;
995
996 fn info() -> McpToolInfo {
997 McpToolInfo {
998 name: "test.mcp.tool",
999 title: None,
1000 description: None,
1001 required_role: None,
1002 is_public: false,
1003 timeout: None,
1004 rate_limit_requests: None,
1005 rate_limit_per_secs: None,
1006 rate_limit_key: None,
1007 annotations: McpToolAnnotations::default(),
1008 icons: &[],
1009 }
1010 }
1011
1012 fn execute(
1013 _ctx: &forge_core::McpToolContext,
1014 _args: Self::Args,
1015 ) -> Pin<Box<dyn Future<Output = forge_core::Result<Self::Output>> + Send + '_>> {
1016 Box::pin(async { Ok(serde_json::json!({ "ok": true })) })
1017 }
1018 }
1019
1020 #[test]
1021 fn test_forge_builder_new() {
1022 let builder = ForgeBuilder::new();
1023 assert!(builder.config.is_none());
1024 }
1025
1026 #[test]
1027 fn test_forge_builder_requires_config() {
1028 let builder = ForgeBuilder::new();
1029 let result = builder.build();
1030 assert!(result.is_err());
1031 }
1032
1033 #[test]
1034 fn test_forge_builder_with_config() {
1035 let config = ForgeConfig::default_with_database_url("postgres://localhost/test");
1036 let result = ForgeBuilder::new().config(config).build();
1037 assert!(result.is_ok());
1038 }
1039
1040 #[test]
1041 fn test_forge_builder_register_mcp_tool() {
1042 let builder = ForgeBuilder::new().register_mcp_tool::<TestMcpTool>();
1043 assert_eq!(builder.mcp_registry.len(), 1);
1044 }
1045
1046 #[test]
1047 fn test_config_role_conversion() {
1048 assert_eq!(
1049 config_role_to_node_role(&ConfigNodeRole::Gateway),
1050 NodeRole::Gateway
1051 );
1052 assert_eq!(
1053 config_role_to_node_role(&ConfigNodeRole::Worker),
1054 NodeRole::Worker
1055 );
1056 assert_eq!(
1057 config_role_to_node_role(&ConfigNodeRole::Scheduler),
1058 NodeRole::Scheduler
1059 );
1060 assert_eq!(
1061 config_role_to_node_role(&ConfigNodeRole::Function),
1062 NodeRole::Function
1063 );
1064 }
1065}