1use std::future::Future;
12use std::net::IpAddr;
13use std::path::PathBuf;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::Duration;
17
18use axum::Router;
19use axum::body::Body;
20use axum::http::Request;
21use axum::response::Response;
22use tokio::sync::broadcast;
23
24use forge_core::CircuitBreakerClient;
25use forge_core::cluster::{LeaderRole, NodeId, NodeInfo, NodeRole, NodeStatus};
26use forge_core::config::{ForgeConfig, NodeRole as ConfigNodeRole};
27use forge_core::error::{ForgeError, Result};
28use forge_core::function::{ForgeMutation, ForgeQuery};
29use forge_core::mcp::ForgeMcpTool;
30use forge_runtime::migrations::{Migration, MigrationRunner, load_migrations_from_dir};
31
32use forge_runtime::cluster::{
33 GracefulShutdown, HeartbeatConfig, HeartbeatLoop, LeaderConfig, LeaderElection, NodeRegistry,
34 ShutdownConfig,
35};
36use forge_runtime::cron::{CronRegistry, CronRunner, CronRunnerConfig};
37use forge_runtime::daemon::{DaemonRegistry, DaemonRunner};
38use forge_runtime::db::Database;
39use forge_runtime::function::FunctionRegistry;
40use forge_runtime::gateway::{AuthConfig, GatewayConfig as RuntimeGatewayConfig, GatewayServer};
41use forge_runtime::jobs::{JobDispatcher, JobQueue, JobRegistry, Worker, WorkerConfig};
42use forge_runtime::mcp::McpToolRegistry;
43use forge_runtime::webhook::{WebhookRegistry, WebhookState, webhook_handler};
44use forge_runtime::workflow::{
45 EventStore, WorkflowExecutor, WorkflowRegistry, WorkflowScheduler, WorkflowSchedulerConfig,
46};
47use tokio_util::sync::CancellationToken;
48
49pub type FrontendHandler = fn(Request<Body>) -> Pin<Box<dyn Future<Output = Response> + Send>>;
51
52pub mod prelude {
54 pub use chrono::{DateTime, Utc};
56 pub use uuid::Uuid;
57
58 pub use serde::{Deserialize, Serialize};
60 pub use serde_json;
61
62 pub type Timestamp = DateTime<Utc>;
64
65 pub use forge_core::auth::TokenPair;
67 pub use forge_core::cluster::NodeRole;
68 pub use forge_core::config::ForgeConfig;
69 pub use forge_core::cron::{CronContext, ForgeCron};
70 pub use forge_core::daemon::{DaemonContext, ForgeDaemon};
71 pub use forge_core::env::EnvAccess;
72 pub use forge_core::error::{ForgeError, Result};
73 pub use forge_core::function::{
74 AuthContext, DbConn, ForgeMutation, ForgeQuery, MutationContext, QueryContext,
75 };
76 pub use forge_core::job::{ForgeJob, JobContext, JobPriority};
77 pub use forge_core::mcp::{ForgeMcpTool, McpToolContext, McpToolResult};
78 pub use forge_core::realtime::Delta;
79 pub use forge_core::schema::{FieldDef, ModelMeta, SchemaRegistry, TableDef};
80 pub use forge_core::schemars::JsonSchema;
81 pub use forge_core::types::Upload;
82 pub use forge_core::webhook::{ForgeWebhook, WebhookContext, WebhookResult, WebhookSignature};
83 pub use forge_core::workflow::{ForgeWorkflow, WorkflowContext};
84
85 pub use axum;
87
88 pub use crate::{Forge, ForgeBuilder};
89}
90
91pub struct Forge {
93 config: ForgeConfig,
94 db: Option<Database>,
95 node_id: NodeId,
96 function_registry: FunctionRegistry,
97 mcp_registry: McpToolRegistry,
98 job_registry: JobRegistry,
99 cron_registry: Arc<CronRegistry>,
100 workflow_registry: WorkflowRegistry,
101 daemon_registry: Arc<DaemonRegistry>,
102 webhook_registry: Arc<WebhookRegistry>,
103 shutdown_tx: broadcast::Sender<()>,
104 migrations_dir: PathBuf,
106 extra_migrations: Vec<Migration>,
108 frontend_handler: Option<FrontendHandler>,
110 custom_routes: Option<Router>,
112}
113
114impl Forge {
115 pub fn builder() -> ForgeBuilder {
117 ForgeBuilder::new()
118 }
119
120 pub fn node_id(&self) -> NodeId {
122 self.node_id
123 }
124
125 pub fn config(&self) -> &ForgeConfig {
127 &self.config
128 }
129
130 pub fn function_registry(&self) -> &FunctionRegistry {
132 &self.function_registry
133 }
134
135 pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
137 &mut self.function_registry
138 }
139
140 pub fn mcp_registry_mut(&mut self) -> &mut McpToolRegistry {
142 &mut self.mcp_registry
143 }
144
145 pub fn register_mcp_tool<T: ForgeMcpTool>(&mut self) -> &mut Self {
147 self.mcp_registry.register::<T>();
148 self
149 }
150
151 pub fn job_registry(&self) -> &JobRegistry {
153 &self.job_registry
154 }
155
156 pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
158 &mut self.job_registry
159 }
160
161 pub fn cron_registry(&self) -> Arc<CronRegistry> {
163 self.cron_registry.clone()
164 }
165
166 pub fn workflow_registry(&self) -> &WorkflowRegistry {
168 &self.workflow_registry
169 }
170
171 pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
173 &mut self.workflow_registry
174 }
175
176 pub fn daemon_registry(&self) -> Arc<DaemonRegistry> {
178 self.daemon_registry.clone()
179 }
180
181 pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
183 self.webhook_registry.clone()
184 }
185
186 async fn persist_workflow_definitions(&self, pool: &sqlx::PgPool) -> Result<()> {
190 for info in self.workflow_registry.definitions() {
191 let status = if info.is_active {
192 "active"
193 } else if info.is_deprecated {
194 "deprecated"
195 } else {
196 "active"
197 };
198
199 let existing = sqlx::query!(
201 r#"
202 SELECT workflow_signature FROM forge_workflow_definitions
203 WHERE workflow_name = $1 AND workflow_version = $2
204 "#,
205 info.name,
206 info.version,
207 )
208 .fetch_optional(pool)
209 .await
210 .map_err(|e| ForgeError::Database(e.to_string()))?;
211
212 if let Some(row) = existing {
213 if row.workflow_signature != info.signature {
214 return Err(ForgeError::Config(format!(
215 "Workflow '{}' version '{}' has a different signature than previously registered. \
216 Persisted contract changed under the same version. \
217 Expected signature: {}, got: {}. \
218 Create a new version instead of modifying the existing one.",
219 info.name, info.version, row.workflow_signature, info.signature
220 )));
221 }
222 sqlx::query!(
224 "UPDATE forge_workflow_definitions SET status = $3 WHERE workflow_name = $1 AND workflow_version = $2",
225 info.name,
226 info.version,
227 status,
228 )
229 .execute(pool)
230 .await
231 .map_err(|e| ForgeError::Database(e.to_string()))?;
232 } else {
233 sqlx::query!(
234 r#"
235 INSERT INTO forge_workflow_definitions (workflow_name, workflow_version, workflow_signature, status)
236 VALUES ($1, $2, $3, $4)
237 "#,
238 info.name,
239 info.version,
240 info.signature,
241 status,
242 )
243 .execute(pool)
244 .await
245 .map_err(|e| ForgeError::Database(e.to_string()))?;
246 }
247
248 tracing::debug!(
249 workflow = info.name,
250 version = info.version,
251 signature = info.signature,
252 status = status,
253 "Workflow definition registered"
254 );
255 }
256
257 Ok(())
258 }
259
260 pub async fn run(mut self) -> Result<()> {
262 self.config.observability.apply_env_overrides();
264
265 let telemetry_config = forge_runtime::TelemetryConfig::from_observability_config(
267 &self.config.observability,
268 &self.config.project.name,
269 &self.config.project.version,
270 );
271 match forge_runtime::init_telemetry(
272 &telemetry_config,
273 &self.config.project.name,
274 &self.config.observability.log_level,
275 ) {
276 Ok(true) => {}
277 Ok(false) => {
278 }
280 Err(e) => {
281 eprintln!("forge: failed to initialize telemetry: {e}");
282 }
283 }
284
285 tracing::debug!("Connecting to database");
286
287 let db =
289 Database::from_config_with_service(&self.config.database, &self.config.project.name)
290 .await?;
291 let pool = db.primary().clone();
292 let jobs_pool = db.jobs_pool().clone();
293 let observability_pool = db.observability_pool().clone();
294 if let Some(handle) = db.start_health_monitor() {
295 let mut shutdown_rx = self.shutdown_tx.subscribe();
296 tokio::spawn(async move {
297 tokio::select! {
298 _ = shutdown_rx.recv() => {}
299 _ = handle => {}
300 }
301 });
302 }
303 self.db = Some(db);
304
305 tracing::debug!("Database connected");
306
307 let runner = MigrationRunner::new(pool.clone());
310
311 let mut user_migrations = load_migrations_from_dir(&self.migrations_dir)?;
313 user_migrations.extend(self.extra_migrations.clone());
314
315 runner.run(user_migrations).await?;
316 tracing::debug!("Migrations applied");
317
318 if !self.workflow_registry.is_empty() {
320 self.persist_workflow_definitions(&pool).await?;
321 }
322
323 let hostname = get_hostname();
325
326 let ip_address: IpAddr = std::env::var("HOST")
328 .unwrap_or_else(|_| "0.0.0.0".to_string())
329 .parse()
330 .unwrap_or_else(|_| "0.0.0.0".parse().expect("valid IP literal"));
331
332 if let Ok(port_str) = std::env::var("PORT")
333 && let Ok(port) = port_str.parse::<u16>()
334 {
335 self.config.gateway.port = port;
336 }
337
338 let roles: Vec<NodeRole> = self
339 .config
340 .node
341 .roles
342 .iter()
343 .map(config_role_to_node_role)
344 .collect();
345
346 let node_info = NodeInfo::new_local(
347 hostname,
348 ip_address,
349 self.config.gateway.port,
350 self.config.gateway.grpc_port,
351 roles.clone(),
352 self.config.node.worker_capabilities.clone(),
353 env!("CARGO_PKG_VERSION").to_string(),
354 );
355
356 let node_id = node_info.id;
357 self.node_id = node_id;
358
359 let node_registry = Arc::new(NodeRegistry::new(pool.clone(), node_info));
361
362 if let Err(e) = node_registry.register().await {
364 tracing::debug!("Failed to register node (tables may not exist): {}", e);
365 }
366
367 if let Err(e) = node_registry.set_status(NodeStatus::Active).await {
369 tracing::debug!("Failed to set node status: {}", e);
370 }
371
372 let leader_election = if roles.contains(&NodeRole::Scheduler) {
374 let election = Arc::new(LeaderElection::new(
375 pool.clone(),
376 node_id,
377 LeaderRole::Scheduler,
378 LeaderConfig::default(),
379 ));
380
381 if let Err(e) = election.try_become_leader().await {
383 tracing::debug!("Failed to acquire leadership: {}", e);
384 }
385
386 Some(election)
387 } else {
388 None
389 };
390
391 let shutdown = Arc::new(GracefulShutdown::new(
393 node_registry.clone(),
394 leader_election.clone(),
395 ShutdownConfig::default(),
396 ));
397
398 let http_client = CircuitBreakerClient::with_defaults(reqwest::Client::new());
400
401 let mut handles = Vec::new();
403
404 {
406 let heartbeat_pool = pool.clone();
407 let heartbeat_node_id = node_id;
408 let config = HeartbeatConfig::from_cluster_config(&self.config.cluster);
409 handles.push(tokio::spawn(async move {
410 let heartbeat = HeartbeatLoop::new(heartbeat_pool, heartbeat_node_id, config);
411 heartbeat.run().await;
412 }));
413 }
414
415 if let Some(ref election) = leader_election {
417 let election = election.clone();
418 handles.push(tokio::spawn(async move {
419 election.run().await;
420 }));
421 }
422
423 if roles.contains(&NodeRole::Worker) {
425 let job_queue = JobQueue::new(jobs_pool.clone());
426 let worker_config = WorkerConfig {
427 id: Some(node_id.as_uuid()),
428 capabilities: self.config.node.worker_capabilities.clone(),
429 max_concurrent: self.config.worker.max_concurrent_jobs,
430 poll_interval: Duration::from_millis(self.config.worker.poll_interval_ms),
431 ..Default::default()
432 };
433
434 let mut worker = Worker::new(
435 worker_config,
436 job_queue,
437 self.job_registry.clone(),
438 jobs_pool.clone(),
439 );
440
441 handles.push(tokio::spawn(async move {
442 if let Err(e) = worker.run().await {
443 tracing::error!("Worker error: {}", e);
444 }
445 }));
446
447 tracing::debug!("Job worker started");
448 }
449
450 if roles.contains(&NodeRole::Scheduler) {
452 let cron_registry = self.cron_registry.clone();
453 let cron_pool = jobs_pool.clone();
454 let cron_http = http_client.clone();
455 let cron_leader_election = leader_election.clone();
456
457 let cron_config = CronRunnerConfig {
458 poll_interval: Duration::from_secs(1),
459 node_id: node_id.as_uuid(),
460 is_leader: cron_leader_election.is_none(),
461 leader_election: cron_leader_election,
462 run_stale_threshold: Duration::from_secs(15 * 60),
463 };
464
465 let cron_runner = CronRunner::new(cron_registry, cron_pool, cron_http, cron_config);
466
467 handles.push(tokio::spawn(async move {
468 if let Err(e) = cron_runner.run().await {
469 tracing::error!("Cron runner error: {}", e);
470 }
471 }));
472
473 tracing::debug!("Cron scheduler started");
474 }
475
476 let workflow_shutdown_token = CancellationToken::new();
478 if roles.contains(&NodeRole::Scheduler) {
479 let scheduler_executor = Arc::new(WorkflowExecutor::new(
480 Arc::new(self.workflow_registry.clone()),
481 jobs_pool.clone(),
482 http_client.clone(),
483 ));
484 let event_store = Arc::new(EventStore::new(jobs_pool.clone()));
485 let scheduler = WorkflowScheduler::new(
486 jobs_pool.clone(),
487 scheduler_executor,
488 event_store,
489 WorkflowSchedulerConfig::default(),
490 );
491
492 let shutdown_token = workflow_shutdown_token.clone();
493 handles.push(tokio::spawn(async move {
494 scheduler.run(shutdown_token).await;
495 }));
496
497 tracing::debug!("Workflow scheduler started");
498 }
499
500 let job_queue_for_dispatch = JobQueue::new(jobs_pool.clone());
502 let job_dispatcher = Arc::new(JobDispatcher::new(
503 job_queue_for_dispatch,
504 self.job_registry.clone(),
505 ));
506 let workflow_executor = Arc::new(WorkflowExecutor::new(
507 Arc::new(self.workflow_registry.clone()),
508 jobs_pool.clone(),
509 http_client.clone(),
510 ));
511
512 if roles.contains(&NodeRole::Scheduler) && !self.daemon_registry.is_empty() {
514 let daemon_registry = self.daemon_registry.clone();
515 let daemon_pool = jobs_pool.clone();
516 let daemon_http = http_client.clone();
517 let daemon_shutdown_rx = self.shutdown_tx.subscribe();
518
519 let daemon_runner = DaemonRunner::new(
520 daemon_registry,
521 daemon_pool,
522 daemon_http,
523 node_id.as_uuid(),
524 daemon_shutdown_rx,
525 )
526 .with_job_dispatch(job_dispatcher.clone())
527 .with_workflow_dispatch(workflow_executor.clone());
528
529 handles.push(tokio::spawn(async move {
530 if let Err(e) = daemon_runner.run().await {
531 tracing::error!("Daemon runner error: {}", e);
532 }
533 }));
534
535 tracing::debug!("Daemon runner started");
536 }
537
538 let mut reactor_handle = None;
540
541 if roles.contains(&NodeRole::Gateway) {
543 let gateway_config = RuntimeGatewayConfig {
544 port: self.config.gateway.port,
545 max_connections: self.config.gateway.max_connections,
546 sse_max_sessions: self.config.gateway.sse_max_sessions,
547 request_timeout_secs: self.config.gateway.request_timeout_secs,
548 cors_enabled: self.config.gateway.cors_enabled
549 || !self.config.gateway.cors_origins.is_empty(),
550 cors_origins: self.config.gateway.cors_origins.clone(),
551 auth: AuthConfig::from_forge_config(&self.config.auth)
552 .map_err(|e| ForgeError::Config(e.to_string()))?,
553 mcp: self.config.mcp.clone(),
554 quiet_routes: self.config.gateway.quiet_routes.clone(),
555 max_body_size_bytes: self.config.gateway.max_body_size_bytes()?,
556 token_ttl: forge_core::AuthTokenTtl {
557 access_token_secs: self.config.auth.access_token_ttl_secs(),
558 refresh_token_days: self.config.auth.refresh_token_ttl_days(),
559 },
560 project_name: self.config.project.name.clone(),
561 };
562
563 let db_ref = self
565 .db
566 .clone()
567 .ok_or_else(|| ForgeError::Internal("Database not initialized".into()))?;
568
569 let mut gateway = GatewayServer::new(
570 gateway_config,
571 self.function_registry.clone(),
572 db_ref.clone(),
573 )
574 .with_job_dispatcher(job_dispatcher.clone())
575 .with_workflow_dispatcher(workflow_executor.clone())
576 .with_mcp_registry(self.mcp_registry.clone());
577
578 if self.config.signals.enabled {
580 let signals_pool = std::sync::Arc::new(db_ref.analytics_pool().clone());
581 let collector = forge_runtime::signals::SignalsCollector::spawn(
582 signals_pool.clone(),
583 self.config.signals.batch_size,
584 std::time::Duration::from_millis(self.config.signals.flush_interval_ms),
585 );
586 gateway = gateway
587 .with_signals_collector(collector)
588 .with_signals_anonymize_ip(self.config.signals.anonymize_ip);
589
590 forge_runtime::signals::session::spawn_session_reaper(
592 signals_pool,
593 self.config.signals.session_timeout_mins,
594 );
595
596 tracing::info!("Signals enabled (analytics + diagnostics)");
597 }
598
599 let reactor = gateway.reactor();
601 if let Err(e) = reactor.start().await {
602 tracing::error!("Failed to start reactor: {}", e);
603 } else {
604 tracing::debug!("Reactor started");
605 reactor_handle = Some(reactor);
606 }
607
608 let api_router = gateway.router();
610
611 let mut router = Router::new().nest("/_api", api_router);
613
614 if !self.webhook_registry.is_empty() {
616 use axum::routing::post;
617 use tower_http::cors::{Any, CorsLayer};
618
619 let webhook_state = Arc::new(
620 WebhookState::new(self.webhook_registry.clone(), pool.clone())
621 .with_job_dispatcher(job_dispatcher.clone()),
622 );
623
624 let webhook_cors = if self.config.gateway.cors_enabled
627 || !self.config.gateway.cors_origins.is_empty()
628 {
629 if self.config.gateway.cors_origins.iter().any(|o| o == "*") {
630 CorsLayer::new()
631 .allow_origin(Any)
632 .allow_methods(Any)
633 .allow_headers(Any)
634 } else {
635 use axum::http::Method;
636 let origins: Vec<_> = self
637 .config
638 .gateway
639 .cors_origins
640 .iter()
641 .filter_map(|o| o.parse().ok())
642 .collect();
643 CorsLayer::new()
644 .allow_origin(origins)
645 .allow_methods([
646 Method::GET,
647 Method::POST,
648 Method::PUT,
649 Method::DELETE,
650 Method::PATCH,
651 Method::OPTIONS,
652 ])
653 .allow_headers([
654 axum::http::header::CONTENT_TYPE,
655 axum::http::header::AUTHORIZATION,
656 axum::http::header::ACCEPT,
657 axum::http::HeaderName::from_static("x-webhook-signature"),
658 axum::http::HeaderName::from_static("x-idempotency-key"),
659 ])
660 .allow_credentials(true)
661 }
662 } else {
663 CorsLayer::new()
664 };
665
666 let webhook_router = Router::new()
667 .route("/{*path}", post(webhook_handler).with_state(webhook_state))
668 .layer(axum::extract::DefaultBodyLimit::max(1024 * 1024))
669 .layer(
670 tower::ServiceBuilder::new()
671 .layer(axum::error_handling::HandleErrorLayer::new(
672 |err: tower::BoxError| async move {
673 if err.is::<tower::timeout::error::Elapsed>() {
674 return (
675 axum::http::StatusCode::REQUEST_TIMEOUT,
676 "Request timed out",
677 );
678 }
679 (
680 axum::http::StatusCode::SERVICE_UNAVAILABLE,
681 "Server overloaded",
682 )
683 },
684 ))
685 .layer(tower::limit::ConcurrencyLimitLayer::new(
686 self.config.gateway.max_connections,
687 ))
688 .layer(tower::timeout::TimeoutLayer::new(Duration::from_secs(
689 self.config.gateway.request_timeout_secs,
690 ))),
691 )
692 .layer(webhook_cors);
693
694 router = router.nest("/_api/webhooks", webhook_router);
695
696 tracing::debug!(
697 webhooks = ?self.webhook_registry.paths().collect::<Vec<_>>(),
698 "Webhook routes registered"
699 );
700 }
701
702 if self.config.mcp.enabled {
704 use axum::routing::get;
705
706 if let Some((oauth_api_router, oauth_state)) = gateway.oauth_router() {
707 router = router.nest("/_api", oauth_api_router);
709
710 router = router
712 .route(
713 "/.well-known/oauth-authorization-server",
714 get(forge_runtime::gateway::oauth::well_known_oauth_metadata)
715 .with_state(oauth_state.clone()),
716 )
717 .route(
718 "/.well-known/oauth-protected-resource",
719 get(forge_runtime::gateway::oauth::well_known_resource_metadata)
720 .with_state(oauth_state),
721 );
722
723 tracing::info!("OAuth 2.1 endpoints enabled for MCP");
724 } else {
725 async fn oauth_not_supported() -> impl axum::response::IntoResponse {
727 (
728 axum::http::StatusCode::NOT_FOUND,
729 axum::Json(serde_json::json!({
730 "error": "oauth_not_supported",
731 "error_description": "This server does not support OAuth. Connect without authentication."
732 })),
733 )
734 }
735 router = router
736 .route(
737 "/.well-known/oauth-authorization-server",
738 get(oauth_not_supported),
739 )
740 .route(
741 "/.well-known/oauth-protected-resource",
742 get(oauth_not_supported),
743 );
744 }
745 }
746
747 if let Some(custom) = self.custom_routes.take() {
749 router = router.merge(custom);
750 tracing::debug!("Custom routes merged");
751 }
752
753 if let Some(handler) = self.frontend_handler {
755 use axum::routing::get;
756 router = router.fallback(get(handler));
757 tracing::debug!("Frontend handler enabled");
758 }
759
760 let addr = gateway.addr();
761
762 handles.push(tokio::spawn(async move {
763 tracing::debug!(addr = %addr, "Gateway server binding");
764 let listener = tokio::net::TcpListener::bind(addr)
765 .await
766 .expect("Failed to bind");
767 if let Err(e) = axum::serve(listener, router).await {
768 tracing::error!("Gateway server error: {}", e);
769 }
770 }));
771 }
772
773 tracing::info!(
774 queries = self.function_registry.queries().count(),
775 mutations = self.function_registry.mutations().count(),
776 jobs = self.job_registry.len(),
777 crons = self.cron_registry.len(),
778 workflows = self.workflow_registry.len(),
779 daemons = self.daemon_registry.len(),
780 webhooks = self.webhook_registry.len(),
781 mcp_tools = self.mcp_registry.len(),
782 "Functions registered"
783 );
784
785 {
786 let metrics_pool = observability_pool;
787 tokio::spawn(async move {
788 loop {
789 tokio::time::sleep(Duration::from_secs(15)).await;
790 forge_runtime::observability::record_pool_metrics(&metrics_pool);
791 }
792 });
793 }
794
795 let role_names: Vec<&str> = roles.iter().map(|r| r.as_str()).collect();
797 let capabilities = &self.config.node.worker_capabilities;
798 tracing::info!(
799 node_id = %node_id,
800 project = %self.config.project.name,
801 version = env!("CARGO_PKG_VERSION"),
802 roles = ?role_names,
803 worker_capabilities = ?capabilities,
804 port = self.config.gateway.port,
805 db_pool_size = self.config.database.pool_size,
806 cluster_discovery = ?self.config.cluster.discovery,
807 observability = self.config.observability.enabled,
808 mcp = self.config.mcp.enabled,
809 "Forge started"
810 );
811
812 let mut shutdown_rx = self.shutdown_tx.subscribe();
814
815 tokio::select! {
816 _ = tokio::signal::ctrl_c() => {
817 tracing::debug!("Received ctrl-c");
818 }
819 _ = shutdown_rx.recv() => {
820 tracing::debug!("Received shutdown notification");
821 }
822 }
823
824 tracing::debug!("Graceful shutdown starting");
826
827 workflow_shutdown_token.cancel();
829
830 if let Err(e) = shutdown.shutdown().await {
831 tracing::warn!(error = %e, "Shutdown error");
832 }
833
834 if let Some(ref election) = leader_election {
836 election.stop();
837 }
838
839 if let Some(ref reactor) = reactor_handle {
841 reactor.stop();
842 }
843
844 if let Some(ref db) = self.db {
846 db.close().await;
847 }
848
849 forge_runtime::shutdown_telemetry();
850 tracing::info!("Forge stopped");
851 Ok(())
852 }
853
854 pub fn shutdown(&self) {
856 let _ = self.shutdown_tx.send(());
857 }
858}
859
860pub struct ForgeBuilder {
862 config: Option<ForgeConfig>,
863 function_registry: FunctionRegistry,
864 mcp_registry: McpToolRegistry,
865 job_registry: JobRegistry,
866 cron_registry: CronRegistry,
867 workflow_registry: WorkflowRegistry,
868 daemon_registry: DaemonRegistry,
869 webhook_registry: WebhookRegistry,
870 migrations_dir: PathBuf,
871 extra_migrations: Vec<Migration>,
872 frontend_handler: Option<FrontendHandler>,
873 custom_routes: Option<Router>,
874}
875
876impl ForgeBuilder {
877 pub fn new() -> Self {
879 Self {
880 config: None,
881 function_registry: FunctionRegistry::new(),
882 mcp_registry: McpToolRegistry::new(),
883 job_registry: JobRegistry::new(),
884 cron_registry: CronRegistry::new(),
885 workflow_registry: WorkflowRegistry::new(),
886 daemon_registry: DaemonRegistry::new(),
887 webhook_registry: WebhookRegistry::new(),
888 migrations_dir: PathBuf::from("migrations"),
889 extra_migrations: Vec::new(),
890 frontend_handler: None,
891 custom_routes: None,
892 }
893 }
894
895 pub fn migrations_dir(mut self, path: impl Into<PathBuf>) -> Self {
901 self.migrations_dir = path.into();
902 self
903 }
904
905 pub fn migration(mut self, name: impl Into<String>, sql: impl Into<String>) -> Self {
910 self.extra_migrations.push(Migration::new(name, sql));
911 self
912 }
913
914 pub fn frontend_handler(mut self, handler: FrontendHandler) -> Self {
919 self.frontend_handler = Some(handler);
920 self
921 }
922
923 pub fn custom_routes(mut self, router: Router) -> Self {
938 self.custom_routes = Some(router);
939 self
940 }
941
942 pub fn auto_register(mut self) -> Self {
949 crate::auto_register::auto_register_all(
950 &mut self.function_registry,
951 &mut self.job_registry,
952 &mut self.cron_registry,
953 &mut self.workflow_registry,
954 &mut self.daemon_registry,
955 &mut self.webhook_registry,
956 &mut self.mcp_registry,
957 );
958 self
959 }
960
961 pub fn config(mut self, config: ForgeConfig) -> Self {
963 self.config = Some(config);
964 self
965 }
966
967 pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
969 &mut self.function_registry
970 }
971
972 pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
974 &mut self.job_registry
975 }
976
977 pub fn mcp_registry_mut(&mut self) -> &mut McpToolRegistry {
979 &mut self.mcp_registry
980 }
981
982 pub fn register_mcp_tool<T: ForgeMcpTool>(mut self) -> Self {
984 self.mcp_registry.register::<T>();
985 self
986 }
987
988 pub fn cron_registry_mut(&mut self) -> &mut CronRegistry {
990 &mut self.cron_registry
991 }
992
993 pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
995 &mut self.workflow_registry
996 }
997
998 pub fn daemon_registry_mut(&mut self) -> &mut DaemonRegistry {
1000 &mut self.daemon_registry
1001 }
1002
1003 pub fn webhook_registry_mut(&mut self) -> &mut WebhookRegistry {
1005 &mut self.webhook_registry
1006 }
1007
1008 pub fn register_query<Q: ForgeQuery>(mut self) -> Self
1010 where
1011 Q::Args: serde::de::DeserializeOwned + Send + 'static,
1012 Q::Output: serde::Serialize + Send + 'static,
1013 {
1014 self.function_registry.register_query::<Q>();
1015 self
1016 }
1017
1018 pub fn register_mutation<M: ForgeMutation>(mut self) -> Self
1020 where
1021 M::Args: serde::de::DeserializeOwned + Send + 'static,
1022 M::Output: serde::Serialize + Send + 'static,
1023 {
1024 self.function_registry.register_mutation::<M>();
1025 self
1026 }
1027
1028 pub fn register_job<J: forge_core::ForgeJob>(mut self) -> Self
1030 where
1031 J::Args: serde::de::DeserializeOwned + Send + 'static,
1032 J::Output: serde::Serialize + Send + 'static,
1033 {
1034 self.job_registry.register::<J>();
1035 self
1036 }
1037
1038 pub fn register_cron<C: forge_core::ForgeCron>(mut self) -> Self {
1040 self.cron_registry.register::<C>();
1041 self
1042 }
1043
1044 pub fn register_workflow<W: forge_core::ForgeWorkflow>(mut self) -> Self
1046 where
1047 W::Input: serde::de::DeserializeOwned,
1048 W::Output: serde::Serialize,
1049 {
1050 self.workflow_registry.register::<W>();
1051 self
1052 }
1053
1054 pub fn register_daemon<D: forge_core::ForgeDaemon>(mut self) -> Self {
1056 self.daemon_registry.register::<D>();
1057 self
1058 }
1059
1060 pub fn register_webhook<W: forge_core::ForgeWebhook>(mut self) -> Self {
1062 self.webhook_registry.register::<W>();
1063 self
1064 }
1065
1066 pub fn build(self) -> Result<Forge> {
1068 let config = self
1069 .config
1070 .ok_or_else(|| ForgeError::Config("Configuration is required".to_string()))?;
1071
1072 let (shutdown_tx, _) = broadcast::channel(1);
1073
1074 Ok(Forge {
1075 config,
1076 db: None,
1077 node_id: NodeId::new(),
1078 function_registry: self.function_registry,
1079 mcp_registry: self.mcp_registry,
1080 job_registry: self.job_registry,
1081 cron_registry: Arc::new(self.cron_registry),
1082 workflow_registry: self.workflow_registry,
1083 daemon_registry: Arc::new(self.daemon_registry),
1084 webhook_registry: Arc::new(self.webhook_registry),
1085 shutdown_tx,
1086 migrations_dir: self.migrations_dir,
1087 extra_migrations: self.extra_migrations,
1088 frontend_handler: self.frontend_handler,
1089 custom_routes: self.custom_routes,
1090 })
1091 }
1092}
1093
1094impl Default for ForgeBuilder {
1095 fn default() -> Self {
1096 Self::new()
1097 }
1098}
1099
1100#[cfg(unix)]
1101fn get_hostname() -> String {
1102 nix::unistd::gethostname()
1103 .map(|h| h.to_string_lossy().to_string())
1104 .unwrap_or_else(|_| "unknown".to_string())
1105}
1106
1107#[cfg(not(unix))]
1108fn get_hostname() -> String {
1109 std::env::var("COMPUTERNAME")
1110 .or_else(|_| std::env::var("HOSTNAME"))
1111 .unwrap_or_else(|_| "unknown".to_string())
1112}
1113
1114fn config_role_to_node_role(role: &ConfigNodeRole) -> NodeRole {
1116 match role {
1117 ConfigNodeRole::Gateway => NodeRole::Gateway,
1118 ConfigNodeRole::Function => NodeRole::Function,
1119 ConfigNodeRole::Worker => NodeRole::Worker,
1120 ConfigNodeRole::Scheduler => NodeRole::Scheduler,
1121 }
1122}
1123
1124#[cfg(test)]
1125#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
1126mod tests {
1127 use super::*;
1128 use std::future::Future;
1129 use std::pin::Pin;
1130
1131 use forge_core::mcp::{McpToolAnnotations, McpToolInfo};
1132
1133 struct TestMcpTool;
1134
1135 impl ForgeMcpTool for TestMcpTool {
1136 type Args = serde_json::Value;
1137 type Output = serde_json::Value;
1138
1139 fn info() -> McpToolInfo {
1140 McpToolInfo {
1141 name: "test.mcp.tool",
1142 title: None,
1143 description: None,
1144 required_role: None,
1145 is_public: false,
1146 timeout: None,
1147 rate_limit_requests: None,
1148 rate_limit_per_secs: None,
1149 rate_limit_key: None,
1150 annotations: McpToolAnnotations::default(),
1151 icons: &[],
1152 }
1153 }
1154
1155 fn execute(
1156 _ctx: &forge_core::McpToolContext,
1157 _args: Self::Args,
1158 ) -> Pin<Box<dyn Future<Output = forge_core::Result<Self::Output>> + Send + '_>> {
1159 Box::pin(async { Ok(serde_json::json!({ "ok": true })) })
1160 }
1161 }
1162
1163 #[test]
1164 fn test_forge_builder_new() {
1165 let builder = ForgeBuilder::new();
1166 assert!(builder.config.is_none());
1167 }
1168
1169 #[test]
1170 fn test_forge_builder_requires_config() {
1171 let builder = ForgeBuilder::new();
1172 let result = builder.build();
1173 assert!(result.is_err());
1174 }
1175
1176 #[test]
1177 fn test_forge_builder_with_config() {
1178 let config = ForgeConfig::default_with_database_url("postgres://localhost/test");
1179 let result = ForgeBuilder::new().config(config).build();
1180 assert!(result.is_ok());
1181 }
1182
1183 #[test]
1184 fn test_forge_builder_register_mcp_tool() {
1185 let builder = ForgeBuilder::new().register_mcp_tool::<TestMcpTool>();
1186 assert_eq!(builder.mcp_registry.len(), 1);
1187 }
1188
1189 #[test]
1190 fn test_config_role_conversion() {
1191 assert_eq!(
1192 config_role_to_node_role(&ConfigNodeRole::Gateway),
1193 NodeRole::Gateway
1194 );
1195 assert_eq!(
1196 config_role_to_node_role(&ConfigNodeRole::Worker),
1197 NodeRole::Worker
1198 );
1199 assert_eq!(
1200 config_role_to_node_role(&ConfigNodeRole::Scheduler),
1201 NodeRole::Scheduler
1202 );
1203 assert_eq!(
1204 config_role_to_node_role(&ConfigNodeRole::Function),
1205 NodeRole::Function
1206 );
1207 }
1208}