1use std::future::Future;
12use std::net::IpAddr;
13use std::path::PathBuf;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::Duration;
17
18use axum::Router;
19use axum::body::Body;
20use axum::http::Request;
21use axum::response::Response;
22use tokio::sync::broadcast;
23
24use forge_core::CircuitBreakerClient;
25use forge_core::cluster::{LeaderRole, NodeId, NodeInfo, NodeRole, NodeStatus};
26use forge_core::config::{ForgeConfig, NodeRole as ConfigNodeRole};
27use forge_core::error::{ForgeError, Result};
28use forge_core::function::{ForgeMutation, ForgeQuery};
29use forge_core::mcp::ForgeMcpTool;
30use forge_runtime::migrations::{Migration, MigrationRunner, load_migrations_from_dir};
31
32use forge_runtime::cluster::{
33 GracefulShutdown, HeartbeatConfig, HeartbeatLoop, LeaderConfig, LeaderElection, NodeRegistry,
34 ShutdownConfig,
35};
36use forge_runtime::cron::{CronRegistry, CronRunner, CronRunnerConfig};
37use forge_runtime::daemon::{DaemonRegistry, DaemonRunner};
38use forge_runtime::db::Database;
39use forge_runtime::function::FunctionRegistry;
40use forge_runtime::gateway::{AuthConfig, GatewayConfig as RuntimeGatewayConfig, GatewayServer};
41use forge_runtime::jobs::{JobDispatcher, JobQueue, JobRegistry, Worker, WorkerConfig};
42use forge_runtime::mcp::McpToolRegistry;
43use forge_runtime::webhook::{WebhookRegistry, WebhookState, webhook_handler};
44use forge_runtime::workflow::{
45 EventStore, WorkflowExecutor, WorkflowRegistry, WorkflowScheduler, WorkflowSchedulerConfig,
46};
47use tokio_util::sync::CancellationToken;
48
49pub type FrontendHandler = fn(Request<Body>) -> Pin<Box<dyn Future<Output = Response> + Send>>;
51
52pub mod prelude {
54 pub use chrono::{DateTime, Utc};
56 pub use uuid::Uuid;
57
58 pub use serde::{Deserialize, Serialize};
60 pub use serde_json;
61
62 pub type Timestamp = DateTime<Utc>;
64
65 pub use forge_core::cluster::NodeRole;
67 pub use forge_core::config::ForgeConfig;
68 pub use forge_core::cron::{CronContext, ForgeCron};
69 pub use forge_core::daemon::{DaemonContext, ForgeDaemon};
70 pub use forge_core::env::EnvAccess;
71 pub use forge_core::error::{ForgeError, Result};
72 pub use forge_core::function::{
73 AuthContext, ForgeMutation, ForgeQuery, MutationContext, QueryContext,
74 };
75 pub use forge_core::job::{ForgeJob, JobContext, JobPriority};
76 pub use forge_core::mcp::{ForgeMcpTool, McpToolContext, McpToolResult};
77 pub use forge_core::realtime::Delta;
78 pub use forge_core::schema::{FieldDef, ModelMeta, SchemaRegistry, TableDef};
79 pub use forge_core::schemars::JsonSchema;
80 pub use forge_core::types::Upload;
81 pub use forge_core::webhook::{ForgeWebhook, WebhookContext, WebhookResult, WebhookSignature};
82 pub use forge_core::workflow::{ForgeWorkflow, WorkflowContext};
83
84 pub use axum;
86
87 pub use crate::{Forge, ForgeBuilder};
88}
89
90pub struct Forge {
92 config: ForgeConfig,
93 db: Option<Database>,
94 node_id: NodeId,
95 function_registry: FunctionRegistry,
96 mcp_registry: McpToolRegistry,
97 job_registry: JobRegistry,
98 cron_registry: Arc<CronRegistry>,
99 workflow_registry: WorkflowRegistry,
100 daemon_registry: Arc<DaemonRegistry>,
101 webhook_registry: Arc<WebhookRegistry>,
102 shutdown_tx: broadcast::Sender<()>,
103 migrations_dir: PathBuf,
105 extra_migrations: Vec<Migration>,
107 frontend_handler: Option<FrontendHandler>,
109 custom_routes: Option<Router>,
111}
112
113impl Forge {
114 pub fn builder() -> ForgeBuilder {
116 ForgeBuilder::new()
117 }
118
119 pub fn node_id(&self) -> NodeId {
121 self.node_id
122 }
123
124 pub fn config(&self) -> &ForgeConfig {
126 &self.config
127 }
128
129 pub fn function_registry(&self) -> &FunctionRegistry {
131 &self.function_registry
132 }
133
134 pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
136 &mut self.function_registry
137 }
138
139 pub fn mcp_registry_mut(&mut self) -> &mut McpToolRegistry {
141 &mut self.mcp_registry
142 }
143
144 pub fn register_mcp_tool<T: ForgeMcpTool>(&mut self) -> &mut Self {
146 self.mcp_registry.register::<T>();
147 self
148 }
149
150 pub fn job_registry(&self) -> &JobRegistry {
152 &self.job_registry
153 }
154
155 pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
157 &mut self.job_registry
158 }
159
160 pub fn cron_registry(&self) -> Arc<CronRegistry> {
162 self.cron_registry.clone()
163 }
164
165 pub fn workflow_registry(&self) -> &WorkflowRegistry {
167 &self.workflow_registry
168 }
169
170 pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
172 &mut self.workflow_registry
173 }
174
175 pub fn daemon_registry(&self) -> Arc<DaemonRegistry> {
177 self.daemon_registry.clone()
178 }
179
180 pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
182 self.webhook_registry.clone()
183 }
184
185 pub async fn run(mut self) -> Result<()> {
187 let telemetry_config = forge_runtime::TelemetryConfig::from_observability_config(
189 &self.config.observability,
190 &self.config.project.name,
191 &self.config.project.version,
192 );
193 match forge_runtime::init_telemetry(
194 &telemetry_config,
195 &self.config.project.name,
196 &self.config.observability.log_level,
197 ) {
198 Ok(true) => {}
199 Ok(false) => {
200 }
202 Err(e) => {
203 eprintln!("forge: failed to initialize telemetry: {e}");
204 }
205 }
206
207 tracing::debug!("Connecting to database");
208
209 let db = Database::from_config(&self.config.database).await?;
211 let pool = db.primary().clone();
212 let jobs_pool = db.jobs_pool().clone();
213 let observability_pool = db.observability_pool().clone();
214 if let Some(handle) = db.start_health_monitor() {
215 let mut shutdown_rx = self.shutdown_tx.subscribe();
216 tokio::spawn(async move {
217 tokio::select! {
218 _ = shutdown_rx.recv() => {}
219 _ = handle => {}
220 }
221 });
222 }
223 self.db = Some(db);
224
225 tracing::debug!("Database connected");
226
227 let runner = MigrationRunner::new(pool.clone());
230
231 let mut user_migrations = load_migrations_from_dir(&self.migrations_dir)?;
233 user_migrations.extend(self.extra_migrations.clone());
234
235 runner.run(user_migrations).await?;
236 tracing::debug!("Migrations applied");
237
238 let hostname = get_hostname();
240
241 let ip_address: IpAddr = "127.0.0.1".parse().expect("valid IP literal");
242 let roles: Vec<NodeRole> = self
243 .config
244 .node
245 .roles
246 .iter()
247 .map(config_role_to_node_role)
248 .collect();
249
250 let node_info = NodeInfo::new_local(
251 hostname,
252 ip_address,
253 self.config.gateway.port,
254 self.config.gateway.grpc_port,
255 roles.clone(),
256 self.config.node.worker_capabilities.clone(),
257 env!("CARGO_PKG_VERSION").to_string(),
258 );
259
260 let node_id = node_info.id;
261 self.node_id = node_id;
262
263 let node_registry = Arc::new(NodeRegistry::new(pool.clone(), node_info));
265
266 if let Err(e) = node_registry.register().await {
268 tracing::debug!("Failed to register node (tables may not exist): {}", e);
269 }
270
271 if let Err(e) = node_registry.set_status(NodeStatus::Active).await {
273 tracing::debug!("Failed to set node status: {}", e);
274 }
275
276 let leader_election = if roles.contains(&NodeRole::Scheduler) {
278 let election = Arc::new(LeaderElection::new(
279 pool.clone(),
280 node_id,
281 LeaderRole::Scheduler,
282 LeaderConfig::default(),
283 ));
284
285 if let Err(e) = election.try_become_leader().await {
287 tracing::debug!("Failed to acquire leadership: {}", e);
288 }
289
290 Some(election)
291 } else {
292 None
293 };
294
295 let shutdown = Arc::new(GracefulShutdown::new(
297 node_registry.clone(),
298 leader_election.clone(),
299 ShutdownConfig::default(),
300 ));
301
302 let http_client = CircuitBreakerClient::with_defaults(reqwest::Client::new());
304
305 let mut handles = Vec::new();
307
308 {
310 let heartbeat_pool = pool.clone();
311 let heartbeat_node_id = node_id;
312 let config = HeartbeatConfig::default();
313 handles.push(tokio::spawn(async move {
314 let heartbeat = HeartbeatLoop::new(heartbeat_pool, heartbeat_node_id, config);
315 heartbeat.run().await;
316 }));
317 }
318
319 if let Some(ref election) = leader_election {
321 let election = election.clone();
322 handles.push(tokio::spawn(async move {
323 election.run().await;
324 }));
325 }
326
327 if roles.contains(&NodeRole::Worker) {
329 let job_queue = JobQueue::new(jobs_pool.clone());
330 let worker_config = WorkerConfig {
331 id: Some(node_id.as_uuid()),
332 capabilities: self.config.node.worker_capabilities.clone(),
333 max_concurrent: self.config.worker.max_concurrent_jobs,
334 poll_interval: Duration::from_millis(self.config.worker.poll_interval_ms),
335 ..Default::default()
336 };
337
338 let mut worker = Worker::new(
339 worker_config,
340 job_queue,
341 self.job_registry.clone(),
342 jobs_pool.clone(),
343 );
344
345 handles.push(tokio::spawn(async move {
346 if let Err(e) = worker.run().await {
347 tracing::error!("Worker error: {}", e);
348 }
349 }));
350
351 tracing::debug!("Job worker started");
352 }
353
354 if roles.contains(&NodeRole::Scheduler) {
356 let cron_registry = self.cron_registry.clone();
357 let cron_pool = jobs_pool.clone();
358 let cron_http = http_client.clone();
359 let cron_leader_election = leader_election.clone();
360
361 let cron_config = CronRunnerConfig {
362 poll_interval: Duration::from_secs(1),
363 node_id: node_id.as_uuid(),
364 is_leader: cron_leader_election.is_none(),
365 leader_election: cron_leader_election,
366 run_stale_threshold: Duration::from_secs(15 * 60),
367 };
368
369 let cron_runner = CronRunner::new(cron_registry, cron_pool, cron_http, cron_config);
370
371 handles.push(tokio::spawn(async move {
372 if let Err(e) = cron_runner.run().await {
373 tracing::error!("Cron runner error: {}", e);
374 }
375 }));
376
377 tracing::debug!("Cron scheduler started");
378 }
379
380 let workflow_shutdown_token = CancellationToken::new();
382 if roles.contains(&NodeRole::Scheduler) {
383 let scheduler_executor = Arc::new(WorkflowExecutor::new(
384 Arc::new(self.workflow_registry.clone()),
385 jobs_pool.clone(),
386 http_client.clone(),
387 ));
388 let event_store = Arc::new(EventStore::new(jobs_pool.clone()));
389 let scheduler = WorkflowScheduler::new(
390 jobs_pool.clone(),
391 scheduler_executor,
392 event_store,
393 WorkflowSchedulerConfig::default(),
394 );
395
396 let shutdown_token = workflow_shutdown_token.clone();
397 handles.push(tokio::spawn(async move {
398 scheduler.run(shutdown_token).await;
399 }));
400
401 tracing::debug!("Workflow scheduler started");
402 }
403
404 let job_queue_for_dispatch = JobQueue::new(jobs_pool.clone());
406 let job_dispatcher = Arc::new(JobDispatcher::new(
407 job_queue_for_dispatch,
408 self.job_registry.clone(),
409 ));
410 let workflow_executor = Arc::new(WorkflowExecutor::new(
411 Arc::new(self.workflow_registry.clone()),
412 jobs_pool.clone(),
413 http_client.clone(),
414 ));
415
416 if roles.contains(&NodeRole::Scheduler) && !self.daemon_registry.is_empty() {
418 let daemon_registry = self.daemon_registry.clone();
419 let daemon_pool = jobs_pool.clone();
420 let daemon_http = http_client.clone();
421 let daemon_shutdown_rx = self.shutdown_tx.subscribe();
422
423 let daemon_runner = DaemonRunner::new(
424 daemon_registry,
425 daemon_pool,
426 daemon_http,
427 node_id.as_uuid(),
428 daemon_shutdown_rx,
429 )
430 .with_job_dispatch(job_dispatcher.clone())
431 .with_workflow_dispatch(workflow_executor.clone());
432
433 handles.push(tokio::spawn(async move {
434 if let Err(e) = daemon_runner.run().await {
435 tracing::error!("Daemon runner error: {}", e);
436 }
437 }));
438
439 tracing::debug!("Daemon runner started");
440 }
441
442 let mut reactor_handle = None;
444
445 let gateway_port = std::env::var("PORT")
447 .ok()
448 .and_then(|p| p.parse::<u16>().ok())
449 .unwrap_or(self.config.gateway.port);
450
451 if roles.contains(&NodeRole::Gateway) {
453 let gateway_config = RuntimeGatewayConfig {
454 port: gateway_port,
455 max_connections: self.config.gateway.max_connections,
456 request_timeout_secs: self.config.gateway.request_timeout_secs,
457 cors_enabled: self.config.gateway.cors_enabled
458 || !self.config.gateway.cors_origins.is_empty(),
459 cors_origins: self.config.gateway.cors_origins.clone(),
460 auth: AuthConfig::from_forge_config(&self.config.auth)
461 .map_err(|e| ForgeError::Config(e.to_string()))?,
462 mcp: self.config.mcp.clone(),
463 quiet_routes: self.config.gateway.quiet_routes.clone(),
464 };
465
466 let gateway = GatewayServer::new(
468 gateway_config,
469 self.function_registry.clone(),
470 self.db.clone().expect("Database must be initialized"),
471 )
472 .with_job_dispatcher(job_dispatcher.clone())
473 .with_workflow_dispatcher(workflow_executor.clone())
474 .with_mcp_registry(self.mcp_registry.clone());
475
476 let reactor = gateway.reactor();
478 if let Err(e) = reactor.start().await {
479 tracing::error!("Failed to start reactor: {}", e);
480 } else {
481 tracing::debug!("Reactor started");
482 reactor_handle = Some(reactor);
483 }
484
485 let api_router = gateway.router();
487
488 let mut router = Router::new().nest("/_api", api_router);
490
491 if !self.webhook_registry.is_empty() {
493 use axum::routing::post;
494 use tower_http::cors::{Any, CorsLayer};
495
496 let webhook_state = Arc::new(
497 WebhookState::new(self.webhook_registry.clone(), pool.clone())
498 .with_job_dispatcher(job_dispatcher.clone()),
499 );
500
501 let webhook_cors = if self.config.gateway.cors_enabled
504 || !self.config.gateway.cors_origins.is_empty()
505 {
506 if self.config.gateway.cors_origins.iter().any(|o| o == "*") {
507 CorsLayer::new()
508 .allow_origin(Any)
509 .allow_methods(Any)
510 .allow_headers(Any)
511 } else {
512 let origins: Vec<_> = self
513 .config
514 .gateway
515 .cors_origins
516 .iter()
517 .filter_map(|o| o.parse().ok())
518 .collect();
519 CorsLayer::new()
520 .allow_origin(origins)
521 .allow_methods(Any)
522 .allow_headers(Any)
523 }
524 } else {
525 CorsLayer::new()
526 };
527
528 let webhook_router = Router::new()
529 .route("/{*path}", post(webhook_handler).with_state(webhook_state))
530 .layer(axum::extract::DefaultBodyLimit::max(1024 * 1024))
531 .layer(
532 tower::ServiceBuilder::new()
533 .layer(axum::error_handling::HandleErrorLayer::new(
534 |err: tower::BoxError| async move {
535 if err.is::<tower::timeout::error::Elapsed>() {
536 return (
537 axum::http::StatusCode::REQUEST_TIMEOUT,
538 "Request timed out",
539 );
540 }
541 (
542 axum::http::StatusCode::SERVICE_UNAVAILABLE,
543 "Server overloaded",
544 )
545 },
546 ))
547 .layer(tower::limit::ConcurrencyLimitLayer::new(
548 self.config.gateway.max_connections,
549 ))
550 .layer(tower::timeout::TimeoutLayer::new(Duration::from_secs(
551 self.config.gateway.request_timeout_secs,
552 ))),
553 )
554 .layer(webhook_cors);
555
556 router = router.nest("/_api/webhooks", webhook_router);
557
558 tracing::debug!(
559 webhooks = ?self.webhook_registry.paths().collect::<Vec<_>>(),
560 "Webhook routes registered"
561 );
562 }
563
564 if let Some(custom) = self.custom_routes.take() {
566 router = router.merge(custom);
567 tracing::debug!("Custom routes merged");
568 }
569
570 if let Some(handler) = self.frontend_handler {
572 use axum::routing::get;
573 router = router.fallback(get(handler));
574 tracing::debug!("Frontend handler enabled");
575 }
576
577 let addr = gateway.addr();
578
579 handles.push(tokio::spawn(async move {
580 tracing::debug!(addr = %addr, "Gateway server binding");
581 let listener = tokio::net::TcpListener::bind(addr)
582 .await
583 .expect("Failed to bind");
584 if let Err(e) = axum::serve(listener, router).await {
585 tracing::error!("Gateway server error: {}", e);
586 }
587 }));
588 }
589
590 tracing::info!(
591 queries = self.function_registry.queries().count(),
592 mutations = self.function_registry.mutations().count(),
593 jobs = self.job_registry.len(),
594 crons = self.cron_registry.len(),
595 workflows = self.workflow_registry.len(),
596 daemons = self.daemon_registry.len(),
597 webhooks = self.webhook_registry.len(),
598 mcp_tools = self.mcp_registry.len(),
599 "Functions registered"
600 );
601
602 {
603 let metrics_pool = observability_pool;
604 tokio::spawn(async move {
605 loop {
606 tokio::time::sleep(Duration::from_secs(15)).await;
607 forge_runtime::observability::record_pool_metrics(&metrics_pool);
608 }
609 });
610 }
611
612 tracing::info!(
613 node_id = %node_id,
614 roles = ?roles,
615 port = gateway_port,
616 "Forge started"
617 );
618
619 let mut shutdown_rx = self.shutdown_tx.subscribe();
621
622 tokio::select! {
623 _ = tokio::signal::ctrl_c() => {
624 tracing::debug!("Received ctrl-c");
625 }
626 _ = shutdown_rx.recv() => {
627 tracing::debug!("Received shutdown notification");
628 }
629 }
630
631 tracing::debug!("Graceful shutdown starting");
633
634 workflow_shutdown_token.cancel();
636
637 if let Err(e) = shutdown.shutdown().await {
638 tracing::warn!(error = %e, "Shutdown error");
639 }
640
641 if let Some(ref election) = leader_election {
643 election.stop();
644 }
645
646 if let Some(ref reactor) = reactor_handle {
648 reactor.stop();
649 }
650
651 if let Some(ref db) = self.db {
653 db.close().await;
654 }
655
656 forge_runtime::shutdown_telemetry();
657 tracing::info!("Forge stopped");
658 Ok(())
659 }
660
661 pub fn shutdown(&self) {
663 let _ = self.shutdown_tx.send(());
664 }
665}
666
667pub struct ForgeBuilder {
669 config: Option<ForgeConfig>,
670 function_registry: FunctionRegistry,
671 mcp_registry: McpToolRegistry,
672 job_registry: JobRegistry,
673 cron_registry: CronRegistry,
674 workflow_registry: WorkflowRegistry,
675 daemon_registry: DaemonRegistry,
676 webhook_registry: WebhookRegistry,
677 migrations_dir: PathBuf,
678 extra_migrations: Vec<Migration>,
679 frontend_handler: Option<FrontendHandler>,
680 custom_routes: Option<Router>,
681}
682
683impl ForgeBuilder {
684 pub fn new() -> Self {
686 Self {
687 config: None,
688 function_registry: FunctionRegistry::new(),
689 mcp_registry: McpToolRegistry::new(),
690 job_registry: JobRegistry::new(),
691 cron_registry: CronRegistry::new(),
692 workflow_registry: WorkflowRegistry::new(),
693 daemon_registry: DaemonRegistry::new(),
694 webhook_registry: WebhookRegistry::new(),
695 migrations_dir: PathBuf::from("migrations"),
696 extra_migrations: Vec::new(),
697 frontend_handler: None,
698 custom_routes: None,
699 }
700 }
701
702 pub fn migrations_dir(mut self, path: impl Into<PathBuf>) -> Self {
708 self.migrations_dir = path.into();
709 self
710 }
711
712 pub fn migration(mut self, name: impl Into<String>, sql: impl Into<String>) -> Self {
717 self.extra_migrations.push(Migration::new(name, sql));
718 self
719 }
720
721 pub fn frontend_handler(&mut self, handler: FrontendHandler) {
726 self.frontend_handler = Some(handler);
727 }
728
729 pub fn custom_routes(&mut self, router: Router) {
744 self.custom_routes = Some(router);
745 }
746
747 pub fn config(mut self, config: ForgeConfig) -> Self {
749 self.config = Some(config);
750 self
751 }
752
753 pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
755 &mut self.function_registry
756 }
757
758 pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
760 &mut self.job_registry
761 }
762
763 pub fn mcp_registry_mut(&mut self) -> &mut McpToolRegistry {
765 &mut self.mcp_registry
766 }
767
768 pub fn register_mcp_tool<T: ForgeMcpTool>(&mut self) -> &mut Self {
770 self.mcp_registry.register::<T>();
771 self
772 }
773
774 pub fn cron_registry_mut(&mut self) -> &mut CronRegistry {
776 &mut self.cron_registry
777 }
778
779 pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
781 &mut self.workflow_registry
782 }
783
784 pub fn daemon_registry_mut(&mut self) -> &mut DaemonRegistry {
786 &mut self.daemon_registry
787 }
788
789 pub fn webhook_registry_mut(&mut self) -> &mut WebhookRegistry {
791 &mut self.webhook_registry
792 }
793
794 pub fn register_query<Q: ForgeQuery>(&mut self) -> &mut Self
796 where
797 Q::Args: serde::de::DeserializeOwned + Send + 'static,
798 Q::Output: serde::Serialize + Send + 'static,
799 {
800 self.function_registry.register_query::<Q>();
801 self
802 }
803
804 pub fn register_mutation<M: ForgeMutation>(&mut self) -> &mut Self
806 where
807 M::Args: serde::de::DeserializeOwned + Send + 'static,
808 M::Output: serde::Serialize + Send + 'static,
809 {
810 self.function_registry.register_mutation::<M>();
811 self
812 }
813
814 pub fn register_job<J: forge_core::ForgeJob>(&mut self) -> &mut Self
816 where
817 J::Args: serde::de::DeserializeOwned + Send + 'static,
818 J::Output: serde::Serialize + Send + 'static,
819 {
820 self.job_registry.register::<J>();
821 self
822 }
823
824 pub fn register_cron<C: forge_core::ForgeCron>(&mut self) -> &mut Self {
826 self.cron_registry.register::<C>();
827 self
828 }
829
830 pub fn register_workflow<W: forge_core::ForgeWorkflow>(&mut self) -> &mut Self
832 where
833 W::Input: serde::de::DeserializeOwned,
834 W::Output: serde::Serialize,
835 {
836 self.workflow_registry.register::<W>();
837 self
838 }
839
840 pub fn register_daemon<D: forge_core::ForgeDaemon>(&mut self) -> &mut Self {
842 self.daemon_registry.register::<D>();
843 self
844 }
845
846 pub fn register_webhook<W: forge_core::ForgeWebhook>(&mut self) -> &mut Self {
848 self.webhook_registry.register::<W>();
849 self
850 }
851
852 pub fn build(self) -> Result<Forge> {
854 let config = self
855 .config
856 .ok_or_else(|| ForgeError::Config("Configuration is required".to_string()))?;
857
858 let (shutdown_tx, _) = broadcast::channel(1);
859
860 Ok(Forge {
861 config,
862 db: None,
863 node_id: NodeId::new(),
864 function_registry: self.function_registry,
865 mcp_registry: self.mcp_registry,
866 job_registry: self.job_registry,
867 cron_registry: Arc::new(self.cron_registry),
868 workflow_registry: self.workflow_registry,
869 daemon_registry: Arc::new(self.daemon_registry),
870 webhook_registry: Arc::new(self.webhook_registry),
871 shutdown_tx,
872 migrations_dir: self.migrations_dir,
873 extra_migrations: self.extra_migrations,
874 frontend_handler: self.frontend_handler,
875 custom_routes: self.custom_routes,
876 })
877 }
878}
879
880impl Default for ForgeBuilder {
881 fn default() -> Self {
882 Self::new()
883 }
884}
885
886#[cfg(unix)]
887fn get_hostname() -> String {
888 nix::unistd::gethostname()
889 .map(|h| h.to_string_lossy().to_string())
890 .unwrap_or_else(|_| "unknown".to_string())
891}
892
893#[cfg(not(unix))]
894fn get_hostname() -> String {
895 std::env::var("COMPUTERNAME")
896 .or_else(|_| std::env::var("HOSTNAME"))
897 .unwrap_or_else(|_| "unknown".to_string())
898}
899
900fn config_role_to_node_role(role: &ConfigNodeRole) -> NodeRole {
902 match role {
903 ConfigNodeRole::Gateway => NodeRole::Gateway,
904 ConfigNodeRole::Function => NodeRole::Function,
905 ConfigNodeRole::Worker => NodeRole::Worker,
906 ConfigNodeRole::Scheduler => NodeRole::Scheduler,
907 }
908}
909
910#[cfg(test)]
911#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
912mod tests {
913 use super::*;
914 use std::future::Future;
915 use std::pin::Pin;
916
917 use forge_core::mcp::{McpToolAnnotations, McpToolInfo};
918
919 struct TestMcpTool;
920
921 impl ForgeMcpTool for TestMcpTool {
922 type Args = serde_json::Value;
923 type Output = serde_json::Value;
924
925 fn info() -> McpToolInfo {
926 McpToolInfo {
927 name: "test.mcp.tool",
928 title: None,
929 description: None,
930 required_role: None,
931 is_public: false,
932 timeout: None,
933 rate_limit_requests: None,
934 rate_limit_per_secs: None,
935 rate_limit_key: None,
936 annotations: McpToolAnnotations::default(),
937 icons: &[],
938 }
939 }
940
941 fn execute(
942 _ctx: &forge_core::McpToolContext,
943 _args: Self::Args,
944 ) -> Pin<Box<dyn Future<Output = forge_core::Result<Self::Output>> + Send + '_>> {
945 Box::pin(async { Ok(serde_json::json!({ "ok": true })) })
946 }
947 }
948
949 #[test]
950 fn test_forge_builder_new() {
951 let builder = ForgeBuilder::new();
952 assert!(builder.config.is_none());
953 }
954
955 #[test]
956 fn test_forge_builder_requires_config() {
957 let builder = ForgeBuilder::new();
958 let result = builder.build();
959 assert!(result.is_err());
960 }
961
962 #[test]
963 fn test_forge_builder_with_config() {
964 let config = ForgeConfig::default_with_database_url("postgres://localhost/test");
965 let result = ForgeBuilder::new().config(config).build();
966 assert!(result.is_ok());
967 }
968
969 #[test]
970 fn test_forge_builder_register_mcp_tool() {
971 let mut builder = ForgeBuilder::new();
972 builder.register_mcp_tool::<TestMcpTool>();
973 assert_eq!(builder.mcp_registry.len(), 1);
974 }
975
976 #[test]
977 fn test_config_role_conversion() {
978 assert_eq!(
979 config_role_to_node_role(&ConfigNodeRole::Gateway),
980 NodeRole::Gateway
981 );
982 assert_eq!(
983 config_role_to_node_role(&ConfigNodeRole::Worker),
984 NodeRole::Worker
985 );
986 assert_eq!(
987 config_role_to_node_role(&ConfigNodeRole::Scheduler),
988 NodeRole::Scheduler
989 );
990 assert_eq!(
991 config_role_to_node_role(&ConfigNodeRole::Function),
992 NodeRole::Function
993 );
994 }
995}