1use std::future::Future;
12use std::net::IpAddr;
13use std::path::PathBuf;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::Duration;
17
18use axum::Router;
19use axum::body::Body;
20use axum::http::Request;
21use axum::response::Response;
22use tokio::sync::broadcast;
23
24use forge_core::CircuitBreakerClient;
25use forge_core::cluster::{LeaderRole, NodeId, NodeInfo, NodeRole, NodeStatus};
26use forge_core::config::{ForgeConfig, NodeRole as ConfigNodeRole};
27use forge_core::error::{ForgeError, Result};
28use forge_core::function::{ForgeMutation, ForgeQuery};
29use forge_core::mcp::ForgeMcpTool;
30use forge_runtime::migrations::{Migration, MigrationRunner, load_migrations_from_dir};
31
32use forge_runtime::cluster::{
33 GracefulShutdown, HeartbeatConfig, HeartbeatLoop, LeaderConfig, LeaderElection, NodeRegistry,
34 ShutdownConfig,
35};
36use forge_runtime::cron::{CronRegistry, CronRunner, CronRunnerConfig};
37use forge_runtime::daemon::{DaemonRegistry, DaemonRunner};
38use forge_runtime::db::Database;
39use forge_runtime::function::FunctionRegistry;
40use forge_runtime::gateway::{AuthConfig, GatewayConfig as RuntimeGatewayConfig, GatewayServer};
41use forge_runtime::jobs::{JobDispatcher, JobQueue, JobRegistry, Worker, WorkerConfig};
42use forge_runtime::mcp::McpToolRegistry;
43use forge_runtime::webhook::{WebhookRegistry, WebhookState, webhook_handler};
44use forge_runtime::workflow::{
45 EventStore, WorkflowExecutor, WorkflowRegistry, WorkflowScheduler, WorkflowSchedulerConfig,
46};
47use tokio_util::sync::CancellationToken;
48
49pub type FrontendHandler = fn(Request<Body>) -> Pin<Box<dyn Future<Output = Response> + Send>>;
51
52pub mod prelude {
54 pub use chrono::{DateTime, Utc};
56 pub use uuid::Uuid;
57
58 pub use serde::{Deserialize, Serialize};
60 pub use serde_json;
61
62 pub type Timestamp = DateTime<Utc>;
64
65 pub use forge_core::cluster::NodeRole;
67 pub use forge_core::config::ForgeConfig;
68 pub use forge_core::cron::{CronContext, ForgeCron};
69 pub use forge_core::daemon::{DaemonContext, ForgeDaemon};
70 pub use forge_core::env::EnvAccess;
71 pub use forge_core::error::{ForgeError, Result};
72 pub use forge_core::function::{
73 AuthContext, ForgeMutation, ForgeQuery, MutationContext, QueryContext,
74 };
75 pub use forge_core::job::{ForgeJob, JobContext, JobPriority};
76 pub use forge_core::mcp::{ForgeMcpTool, McpToolContext, McpToolResult};
77 pub use forge_core::realtime::Delta;
78 pub use forge_core::schema::{FieldDef, ModelMeta, SchemaRegistry, TableDef};
79 pub use forge_core::schemars::JsonSchema;
80 pub use forge_core::types::Upload;
81 pub use forge_core::webhook::{ForgeWebhook, WebhookContext, WebhookResult, WebhookSignature};
82 pub use forge_core::workflow::{ForgeWorkflow, WorkflowContext};
83
84 pub use axum;
86
87 pub use crate::{Forge, ForgeBuilder};
88}
89
90pub struct Forge {
92 config: ForgeConfig,
93 db: Option<Database>,
94 node_id: NodeId,
95 function_registry: FunctionRegistry,
96 mcp_registry: McpToolRegistry,
97 job_registry: JobRegistry,
98 cron_registry: Arc<CronRegistry>,
99 workflow_registry: WorkflowRegistry,
100 daemon_registry: Arc<DaemonRegistry>,
101 webhook_registry: Arc<WebhookRegistry>,
102 shutdown_tx: broadcast::Sender<()>,
103 migrations_dir: PathBuf,
105 extra_migrations: Vec<Migration>,
107 frontend_handler: Option<FrontendHandler>,
109 custom_routes: Option<Router>,
111}
112
113impl Forge {
114 pub fn builder() -> ForgeBuilder {
116 ForgeBuilder::new()
117 }
118
119 pub fn node_id(&self) -> NodeId {
121 self.node_id
122 }
123
124 pub fn config(&self) -> &ForgeConfig {
126 &self.config
127 }
128
129 pub fn function_registry(&self) -> &FunctionRegistry {
131 &self.function_registry
132 }
133
134 pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
136 &mut self.function_registry
137 }
138
139 pub fn mcp_registry_mut(&mut self) -> &mut McpToolRegistry {
141 &mut self.mcp_registry
142 }
143
144 pub fn register_mcp_tool<T: ForgeMcpTool>(&mut self) -> &mut Self {
146 self.mcp_registry.register::<T>();
147 self
148 }
149
150 pub fn job_registry(&self) -> &JobRegistry {
152 &self.job_registry
153 }
154
155 pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
157 &mut self.job_registry
158 }
159
160 pub fn cron_registry(&self) -> Arc<CronRegistry> {
162 self.cron_registry.clone()
163 }
164
165 pub fn workflow_registry(&self) -> &WorkflowRegistry {
167 &self.workflow_registry
168 }
169
170 pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
172 &mut self.workflow_registry
173 }
174
175 pub fn daemon_registry(&self) -> Arc<DaemonRegistry> {
177 self.daemon_registry.clone()
178 }
179
180 pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
182 self.webhook_registry.clone()
183 }
184
185 pub async fn run(mut self) -> Result<()> {
187 let telemetry_config = forge_runtime::TelemetryConfig::from_observability_config(
189 &self.config.observability,
190 &self.config.project.name,
191 &self.config.project.version,
192 );
193 match forge_runtime::init_telemetry(
194 &telemetry_config,
195 &self.config.project.name,
196 &self.config.observability.log_level,
197 ) {
198 Ok(true) => {}
199 Ok(false) => {
200 }
202 Err(e) => {
203 eprintln!("forge: failed to initialize telemetry: {e}");
204 }
205 }
206
207 tracing::debug!("Connecting to database");
208
209 let db =
211 Database::from_config_with_service(&self.config.database, &self.config.project.name)
212 .await?;
213 let pool = db.primary().clone();
214 let jobs_pool = db.jobs_pool().clone();
215 let observability_pool = db.observability_pool().clone();
216 if let Some(handle) = db.start_health_monitor() {
217 let mut shutdown_rx = self.shutdown_tx.subscribe();
218 tokio::spawn(async move {
219 tokio::select! {
220 _ = shutdown_rx.recv() => {}
221 _ = handle => {}
222 }
223 });
224 }
225 self.db = Some(db);
226
227 tracing::debug!("Database connected");
228
229 let runner = MigrationRunner::new(pool.clone());
232
233 let mut user_migrations = load_migrations_from_dir(&self.migrations_dir)?;
235 user_migrations.extend(self.extra_migrations.clone());
236
237 runner.run(user_migrations).await?;
238 tracing::debug!("Migrations applied");
239
240 let hostname = get_hostname();
242
243 let ip_address: IpAddr = "127.0.0.1".parse().expect("valid IP literal");
244 let roles: Vec<NodeRole> = self
245 .config
246 .node
247 .roles
248 .iter()
249 .map(config_role_to_node_role)
250 .collect();
251
252 let node_info = NodeInfo::new_local(
253 hostname,
254 ip_address,
255 self.config.gateway.port,
256 self.config.gateway.grpc_port,
257 roles.clone(),
258 self.config.node.worker_capabilities.clone(),
259 env!("CARGO_PKG_VERSION").to_string(),
260 );
261
262 let node_id = node_info.id;
263 self.node_id = node_id;
264
265 let node_registry = Arc::new(NodeRegistry::new(pool.clone(), node_info));
267
268 if let Err(e) = node_registry.register().await {
270 tracing::debug!("Failed to register node (tables may not exist): {}", e);
271 }
272
273 if let Err(e) = node_registry.set_status(NodeStatus::Active).await {
275 tracing::debug!("Failed to set node status: {}", e);
276 }
277
278 let leader_election = if roles.contains(&NodeRole::Scheduler) {
280 let election = Arc::new(LeaderElection::new(
281 pool.clone(),
282 node_id,
283 LeaderRole::Scheduler,
284 LeaderConfig::default(),
285 ));
286
287 if let Err(e) = election.try_become_leader().await {
289 tracing::debug!("Failed to acquire leadership: {}", e);
290 }
291
292 Some(election)
293 } else {
294 None
295 };
296
297 let shutdown = Arc::new(GracefulShutdown::new(
299 node_registry.clone(),
300 leader_election.clone(),
301 ShutdownConfig::default(),
302 ));
303
304 let http_client = CircuitBreakerClient::with_defaults(reqwest::Client::new());
306
307 let mut handles = Vec::new();
309
310 {
312 let heartbeat_pool = pool.clone();
313 let heartbeat_node_id = node_id;
314 let config = HeartbeatConfig::default();
315 handles.push(tokio::spawn(async move {
316 let heartbeat = HeartbeatLoop::new(heartbeat_pool, heartbeat_node_id, config);
317 heartbeat.run().await;
318 }));
319 }
320
321 if let Some(ref election) = leader_election {
323 let election = election.clone();
324 handles.push(tokio::spawn(async move {
325 election.run().await;
326 }));
327 }
328
329 if roles.contains(&NodeRole::Worker) {
331 let job_queue = JobQueue::new(jobs_pool.clone());
332 let worker_config = WorkerConfig {
333 id: Some(node_id.as_uuid()),
334 capabilities: self.config.node.worker_capabilities.clone(),
335 max_concurrent: self.config.worker.max_concurrent_jobs,
336 poll_interval: Duration::from_millis(self.config.worker.poll_interval_ms),
337 ..Default::default()
338 };
339
340 let mut worker = Worker::new(
341 worker_config,
342 job_queue,
343 self.job_registry.clone(),
344 jobs_pool.clone(),
345 );
346
347 handles.push(tokio::spawn(async move {
348 if let Err(e) = worker.run().await {
349 tracing::error!("Worker error: {}", e);
350 }
351 }));
352
353 tracing::debug!("Job worker started");
354 }
355
356 if roles.contains(&NodeRole::Scheduler) {
358 let cron_registry = self.cron_registry.clone();
359 let cron_pool = jobs_pool.clone();
360 let cron_http = http_client.clone();
361 let cron_leader_election = leader_election.clone();
362
363 let cron_config = CronRunnerConfig {
364 poll_interval: Duration::from_secs(1),
365 node_id: node_id.as_uuid(),
366 is_leader: cron_leader_election.is_none(),
367 leader_election: cron_leader_election,
368 run_stale_threshold: Duration::from_secs(15 * 60),
369 };
370
371 let cron_runner = CronRunner::new(cron_registry, cron_pool, cron_http, cron_config);
372
373 handles.push(tokio::spawn(async move {
374 if let Err(e) = cron_runner.run().await {
375 tracing::error!("Cron runner error: {}", e);
376 }
377 }));
378
379 tracing::debug!("Cron scheduler started");
380 }
381
382 let workflow_shutdown_token = CancellationToken::new();
384 if roles.contains(&NodeRole::Scheduler) {
385 let scheduler_executor = Arc::new(WorkflowExecutor::new(
386 Arc::new(self.workflow_registry.clone()),
387 jobs_pool.clone(),
388 http_client.clone(),
389 ));
390 let event_store = Arc::new(EventStore::new(jobs_pool.clone()));
391 let scheduler = WorkflowScheduler::new(
392 jobs_pool.clone(),
393 scheduler_executor,
394 event_store,
395 WorkflowSchedulerConfig::default(),
396 );
397
398 let shutdown_token = workflow_shutdown_token.clone();
399 handles.push(tokio::spawn(async move {
400 scheduler.run(shutdown_token).await;
401 }));
402
403 tracing::debug!("Workflow scheduler started");
404 }
405
406 let job_queue_for_dispatch = JobQueue::new(jobs_pool.clone());
408 let job_dispatcher = Arc::new(JobDispatcher::new(
409 job_queue_for_dispatch,
410 self.job_registry.clone(),
411 ));
412 let workflow_executor = Arc::new(WorkflowExecutor::new(
413 Arc::new(self.workflow_registry.clone()),
414 jobs_pool.clone(),
415 http_client.clone(),
416 ));
417
418 if roles.contains(&NodeRole::Scheduler) && !self.daemon_registry.is_empty() {
420 let daemon_registry = self.daemon_registry.clone();
421 let daemon_pool = jobs_pool.clone();
422 let daemon_http = http_client.clone();
423 let daemon_shutdown_rx = self.shutdown_tx.subscribe();
424
425 let daemon_runner = DaemonRunner::new(
426 daemon_registry,
427 daemon_pool,
428 daemon_http,
429 node_id.as_uuid(),
430 daemon_shutdown_rx,
431 )
432 .with_job_dispatch(job_dispatcher.clone())
433 .with_workflow_dispatch(workflow_executor.clone());
434
435 handles.push(tokio::spawn(async move {
436 if let Err(e) = daemon_runner.run().await {
437 tracing::error!("Daemon runner error: {}", e);
438 }
439 }));
440
441 tracing::debug!("Daemon runner started");
442 }
443
444 let mut reactor_handle = None;
446
447 let gateway_port = std::env::var("PORT")
449 .ok()
450 .and_then(|p| p.parse::<u16>().ok())
451 .unwrap_or(self.config.gateway.port);
452
453 if roles.contains(&NodeRole::Gateway) {
455 let gateway_config = RuntimeGatewayConfig {
456 port: gateway_port,
457 max_connections: self.config.gateway.max_connections,
458 request_timeout_secs: self.config.gateway.request_timeout_secs,
459 cors_enabled: self.config.gateway.cors_enabled
460 || !self.config.gateway.cors_origins.is_empty(),
461 cors_origins: self.config.gateway.cors_origins.clone(),
462 auth: AuthConfig::from_forge_config(&self.config.auth)
463 .map_err(|e| ForgeError::Config(e.to_string()))?,
464 mcp: self.config.mcp.clone(),
465 quiet_routes: self.config.gateway.quiet_routes.clone(),
466 };
467
468 let gateway = GatewayServer::new(
470 gateway_config,
471 self.function_registry.clone(),
472 self.db.clone().expect("Database must be initialized"),
473 )
474 .with_job_dispatcher(job_dispatcher.clone())
475 .with_workflow_dispatcher(workflow_executor.clone())
476 .with_mcp_registry(self.mcp_registry.clone());
477
478 let reactor = gateway.reactor();
480 if let Err(e) = reactor.start().await {
481 tracing::error!("Failed to start reactor: {}", e);
482 } else {
483 tracing::debug!("Reactor started");
484 reactor_handle = Some(reactor);
485 }
486
487 let api_router = gateway.router();
489
490 let mut router = Router::new().nest("/_api", api_router);
492
493 if !self.webhook_registry.is_empty() {
495 use axum::routing::post;
496 use tower_http::cors::{Any, CorsLayer};
497
498 let webhook_state = Arc::new(
499 WebhookState::new(self.webhook_registry.clone(), pool.clone())
500 .with_job_dispatcher(job_dispatcher.clone()),
501 );
502
503 let webhook_cors = if self.config.gateway.cors_enabled
506 || !self.config.gateway.cors_origins.is_empty()
507 {
508 if self.config.gateway.cors_origins.iter().any(|o| o == "*") {
509 CorsLayer::new()
510 .allow_origin(Any)
511 .allow_methods(Any)
512 .allow_headers(Any)
513 } else {
514 let origins: Vec<_> = self
515 .config
516 .gateway
517 .cors_origins
518 .iter()
519 .filter_map(|o| o.parse().ok())
520 .collect();
521 CorsLayer::new()
522 .allow_origin(origins)
523 .allow_methods(Any)
524 .allow_headers(Any)
525 }
526 } else {
527 CorsLayer::new()
528 };
529
530 let webhook_router = Router::new()
531 .route("/{*path}", post(webhook_handler).with_state(webhook_state))
532 .layer(axum::extract::DefaultBodyLimit::max(1024 * 1024))
533 .layer(
534 tower::ServiceBuilder::new()
535 .layer(axum::error_handling::HandleErrorLayer::new(
536 |err: tower::BoxError| async move {
537 if err.is::<tower::timeout::error::Elapsed>() {
538 return (
539 axum::http::StatusCode::REQUEST_TIMEOUT,
540 "Request timed out",
541 );
542 }
543 (
544 axum::http::StatusCode::SERVICE_UNAVAILABLE,
545 "Server overloaded",
546 )
547 },
548 ))
549 .layer(tower::limit::ConcurrencyLimitLayer::new(
550 self.config.gateway.max_connections,
551 ))
552 .layer(tower::timeout::TimeoutLayer::new(Duration::from_secs(
553 self.config.gateway.request_timeout_secs,
554 ))),
555 )
556 .layer(webhook_cors);
557
558 router = router.nest("/_api/webhooks", webhook_router);
559
560 tracing::debug!(
561 webhooks = ?self.webhook_registry.paths().collect::<Vec<_>>(),
562 "Webhook routes registered"
563 );
564 }
565
566 if let Some(custom) = self.custom_routes.take() {
568 router = router.merge(custom);
569 tracing::debug!("Custom routes merged");
570 }
571
572 if let Some(handler) = self.frontend_handler {
574 use axum::routing::get;
575 router = router.fallback(get(handler));
576 tracing::debug!("Frontend handler enabled");
577 }
578
579 let addr = gateway.addr();
580
581 handles.push(tokio::spawn(async move {
582 tracing::debug!(addr = %addr, "Gateway server binding");
583 let listener = tokio::net::TcpListener::bind(addr)
584 .await
585 .expect("Failed to bind");
586 if let Err(e) = axum::serve(listener, router).await {
587 tracing::error!("Gateway server error: {}", e);
588 }
589 }));
590 }
591
592 tracing::info!(
593 queries = self.function_registry.queries().count(),
594 mutations = self.function_registry.mutations().count(),
595 jobs = self.job_registry.len(),
596 crons = self.cron_registry.len(),
597 workflows = self.workflow_registry.len(),
598 daemons = self.daemon_registry.len(),
599 webhooks = self.webhook_registry.len(),
600 mcp_tools = self.mcp_registry.len(),
601 "Functions registered"
602 );
603
604 {
605 let metrics_pool = observability_pool;
606 tokio::spawn(async move {
607 loop {
608 tokio::time::sleep(Duration::from_secs(15)).await;
609 forge_runtime::observability::record_pool_metrics(&metrics_pool);
610 }
611 });
612 }
613
614 tracing::info!(
615 node_id = %node_id,
616 roles = ?roles,
617 port = gateway_port,
618 "Forge started"
619 );
620
621 let mut shutdown_rx = self.shutdown_tx.subscribe();
623
624 tokio::select! {
625 _ = tokio::signal::ctrl_c() => {
626 tracing::debug!("Received ctrl-c");
627 }
628 _ = shutdown_rx.recv() => {
629 tracing::debug!("Received shutdown notification");
630 }
631 }
632
633 tracing::debug!("Graceful shutdown starting");
635
636 workflow_shutdown_token.cancel();
638
639 if let Err(e) = shutdown.shutdown().await {
640 tracing::warn!(error = %e, "Shutdown error");
641 }
642
643 if let Some(ref election) = leader_election {
645 election.stop();
646 }
647
648 if let Some(ref reactor) = reactor_handle {
650 reactor.stop();
651 }
652
653 if let Some(ref db) = self.db {
655 db.close().await;
656 }
657
658 forge_runtime::shutdown_telemetry();
659 tracing::info!("Forge stopped");
660 Ok(())
661 }
662
663 pub fn shutdown(&self) {
665 let _ = self.shutdown_tx.send(());
666 }
667}
668
669pub struct ForgeBuilder {
671 config: Option<ForgeConfig>,
672 function_registry: FunctionRegistry,
673 mcp_registry: McpToolRegistry,
674 job_registry: JobRegistry,
675 cron_registry: CronRegistry,
676 workflow_registry: WorkflowRegistry,
677 daemon_registry: DaemonRegistry,
678 webhook_registry: WebhookRegistry,
679 migrations_dir: PathBuf,
680 extra_migrations: Vec<Migration>,
681 frontend_handler: Option<FrontendHandler>,
682 custom_routes: Option<Router>,
683}
684
685impl ForgeBuilder {
686 pub fn new() -> Self {
688 Self {
689 config: None,
690 function_registry: FunctionRegistry::new(),
691 mcp_registry: McpToolRegistry::new(),
692 job_registry: JobRegistry::new(),
693 cron_registry: CronRegistry::new(),
694 workflow_registry: WorkflowRegistry::new(),
695 daemon_registry: DaemonRegistry::new(),
696 webhook_registry: WebhookRegistry::new(),
697 migrations_dir: PathBuf::from("migrations"),
698 extra_migrations: Vec::new(),
699 frontend_handler: None,
700 custom_routes: None,
701 }
702 }
703
704 pub fn migrations_dir(mut self, path: impl Into<PathBuf>) -> Self {
710 self.migrations_dir = path.into();
711 self
712 }
713
714 pub fn migration(mut self, name: impl Into<String>, sql: impl Into<String>) -> Self {
719 self.extra_migrations.push(Migration::new(name, sql));
720 self
721 }
722
723 pub fn frontend_handler(mut self, handler: FrontendHandler) -> Self {
728 self.frontend_handler = Some(handler);
729 self
730 }
731
732 pub fn custom_routes(mut self, router: Router) -> Self {
747 self.custom_routes = Some(router);
748 self
749 }
750
751 pub fn config(mut self, config: ForgeConfig) -> Self {
753 self.config = Some(config);
754 self
755 }
756
757 pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
759 &mut self.function_registry
760 }
761
762 pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
764 &mut self.job_registry
765 }
766
767 pub fn mcp_registry_mut(&mut self) -> &mut McpToolRegistry {
769 &mut self.mcp_registry
770 }
771
772 pub fn register_mcp_tool<T: ForgeMcpTool>(mut self) -> Self {
774 self.mcp_registry.register::<T>();
775 self
776 }
777
778 pub fn cron_registry_mut(&mut self) -> &mut CronRegistry {
780 &mut self.cron_registry
781 }
782
783 pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
785 &mut self.workflow_registry
786 }
787
788 pub fn daemon_registry_mut(&mut self) -> &mut DaemonRegistry {
790 &mut self.daemon_registry
791 }
792
793 pub fn webhook_registry_mut(&mut self) -> &mut WebhookRegistry {
795 &mut self.webhook_registry
796 }
797
798 pub fn register_query<Q: ForgeQuery>(mut self) -> Self
800 where
801 Q::Args: serde::de::DeserializeOwned + Send + 'static,
802 Q::Output: serde::Serialize + Send + 'static,
803 {
804 self.function_registry.register_query::<Q>();
805 self
806 }
807
808 pub fn register_mutation<M: ForgeMutation>(mut self) -> Self
810 where
811 M::Args: serde::de::DeserializeOwned + Send + 'static,
812 M::Output: serde::Serialize + Send + 'static,
813 {
814 self.function_registry.register_mutation::<M>();
815 self
816 }
817
818 pub fn register_job<J: forge_core::ForgeJob>(mut self) -> Self
820 where
821 J::Args: serde::de::DeserializeOwned + Send + 'static,
822 J::Output: serde::Serialize + Send + 'static,
823 {
824 self.job_registry.register::<J>();
825 self
826 }
827
828 pub fn register_cron<C: forge_core::ForgeCron>(mut self) -> Self {
830 self.cron_registry.register::<C>();
831 self
832 }
833
834 pub fn register_workflow<W: forge_core::ForgeWorkflow>(mut self) -> Self
836 where
837 W::Input: serde::de::DeserializeOwned,
838 W::Output: serde::Serialize,
839 {
840 self.workflow_registry.register::<W>();
841 self
842 }
843
844 pub fn register_daemon<D: forge_core::ForgeDaemon>(mut self) -> Self {
846 self.daemon_registry.register::<D>();
847 self
848 }
849
850 pub fn register_webhook<W: forge_core::ForgeWebhook>(mut self) -> Self {
852 self.webhook_registry.register::<W>();
853 self
854 }
855
856 pub fn build(self) -> Result<Forge> {
858 let config = self
859 .config
860 .ok_or_else(|| ForgeError::Config("Configuration is required".to_string()))?;
861
862 let (shutdown_tx, _) = broadcast::channel(1);
863
864 Ok(Forge {
865 config,
866 db: None,
867 node_id: NodeId::new(),
868 function_registry: self.function_registry,
869 mcp_registry: self.mcp_registry,
870 job_registry: self.job_registry,
871 cron_registry: Arc::new(self.cron_registry),
872 workflow_registry: self.workflow_registry,
873 daemon_registry: Arc::new(self.daemon_registry),
874 webhook_registry: Arc::new(self.webhook_registry),
875 shutdown_tx,
876 migrations_dir: self.migrations_dir,
877 extra_migrations: self.extra_migrations,
878 frontend_handler: self.frontend_handler,
879 custom_routes: self.custom_routes,
880 })
881 }
882}
883
884impl Default for ForgeBuilder {
885 fn default() -> Self {
886 Self::new()
887 }
888}
889
890#[cfg(unix)]
891fn get_hostname() -> String {
892 nix::unistd::gethostname()
893 .map(|h| h.to_string_lossy().to_string())
894 .unwrap_or_else(|_| "unknown".to_string())
895}
896
897#[cfg(not(unix))]
898fn get_hostname() -> String {
899 std::env::var("COMPUTERNAME")
900 .or_else(|_| std::env::var("HOSTNAME"))
901 .unwrap_or_else(|_| "unknown".to_string())
902}
903
904fn config_role_to_node_role(role: &ConfigNodeRole) -> NodeRole {
906 match role {
907 ConfigNodeRole::Gateway => NodeRole::Gateway,
908 ConfigNodeRole::Function => NodeRole::Function,
909 ConfigNodeRole::Worker => NodeRole::Worker,
910 ConfigNodeRole::Scheduler => NodeRole::Scheduler,
911 }
912}
913
914#[cfg(test)]
915#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
916mod tests {
917 use super::*;
918 use std::future::Future;
919 use std::pin::Pin;
920
921 use forge_core::mcp::{McpToolAnnotations, McpToolInfo};
922
923 struct TestMcpTool;
924
925 impl ForgeMcpTool for TestMcpTool {
926 type Args = serde_json::Value;
927 type Output = serde_json::Value;
928
929 fn info() -> McpToolInfo {
930 McpToolInfo {
931 name: "test.mcp.tool",
932 title: None,
933 description: None,
934 required_role: None,
935 is_public: false,
936 timeout: None,
937 rate_limit_requests: None,
938 rate_limit_per_secs: None,
939 rate_limit_key: None,
940 annotations: McpToolAnnotations::default(),
941 icons: &[],
942 }
943 }
944
945 fn execute(
946 _ctx: &forge_core::McpToolContext,
947 _args: Self::Args,
948 ) -> Pin<Box<dyn Future<Output = forge_core::Result<Self::Output>> + Send + '_>> {
949 Box::pin(async { Ok(serde_json::json!({ "ok": true })) })
950 }
951 }
952
953 #[test]
954 fn test_forge_builder_new() {
955 let builder = ForgeBuilder::new();
956 assert!(builder.config.is_none());
957 }
958
959 #[test]
960 fn test_forge_builder_requires_config() {
961 let builder = ForgeBuilder::new();
962 let result = builder.build();
963 assert!(result.is_err());
964 }
965
966 #[test]
967 fn test_forge_builder_with_config() {
968 let config = ForgeConfig::default_with_database_url("postgres://localhost/test");
969 let result = ForgeBuilder::new().config(config).build();
970 assert!(result.is_ok());
971 }
972
973 #[test]
974 fn test_forge_builder_register_mcp_tool() {
975 let builder = ForgeBuilder::new().register_mcp_tool::<TestMcpTool>();
976 assert_eq!(builder.mcp_registry.len(), 1);
977 }
978
979 #[test]
980 fn test_config_role_conversion() {
981 assert_eq!(
982 config_role_to_node_role(&ConfigNodeRole::Gateway),
983 NodeRole::Gateway
984 );
985 assert_eq!(
986 config_role_to_node_role(&ConfigNodeRole::Worker),
987 NodeRole::Worker
988 );
989 assert_eq!(
990 config_role_to_node_role(&ConfigNodeRole::Scheduler),
991 NodeRole::Scheduler
992 );
993 assert_eq!(
994 config_role_to_node_role(&ConfigNodeRole::Function),
995 NodeRole::Function
996 );
997 }
998}