1mod builder;
4pub use builder::ForgeBuilder;
5
6#[cfg(feature = "gateway")]
7use std::future::Future;
8use std::net::IpAddr;
9use std::path::PathBuf;
10#[cfg(feature = "gateway")]
11use std::pin::Pin;
12use std::sync::Arc;
13use std::time::Duration;
14
15use uuid::Uuid;
16
17#[cfg(feature = "gateway")]
18use axum::Router;
19#[cfg(feature = "gateway")]
20use axum::body::Body;
21#[cfg(feature = "gateway")]
22use axum::http::Request;
23#[cfg(feature = "gateway")]
24use axum::response::Response;
25use tokio::sync::broadcast;
26
27use forge_core::cluster::{LeaderRole, NodeId, NodeInfo, NodeRole, NodeStatus};
28use forge_core::config::ForgeConfig;
29use forge_core::error::{ForgeError, Result};
30use forge_runtime::pg::migration::{Migration, MigrationRunner, load_migrations_from_dir};
31
32#[cfg(feature = "gateway")]
33use forge_core::mcp::ForgeMcpTool;
34use forge_runtime::cluster::{
35 GracefulShutdown, HeartbeatConfig, HeartbeatLoop, NodeRegistry, ShutdownConfig,
36};
37#[cfg(feature = "cron")]
38use forge_runtime::cron::{CronRegistry, CronRunner, CronRunnerConfig};
39#[cfg(feature = "daemons")]
40use forge_runtime::daemon::{DaemonRegistry, DaemonRunner};
41use forge_runtime::function::FunctionRegistry;
42use forge_runtime::pg::Database;
43use forge_runtime::pg::{LeaderConfig, LeaderElection, PgNotifyBus};
44#[cfg(any(feature = "cron", feature = "daemons", feature = "workflows"))]
47use forge_core::CircuitBreakerClient;
48#[cfg(feature = "gateway")]
49use forge_runtime::gateway::{
50 AuthConfig, GatewayConfig as RuntimeGatewayConfig, GatewayServer, TlsListenConfig,
51 bind_listener,
52};
53#[cfg(feature = "jobs")]
54use forge_runtime::jobs::{JobDispatcher, JobQueue, JobRegistry, Worker, WorkerConfig};
55#[cfg(feature = "gateway")]
56use forge_runtime::mcp::McpToolRegistry;
57#[cfg(feature = "gateway")]
58use forge_runtime::realtime::{
59 InvalidationConfig, ListenerConfig, ReactorConfig, RealtimeConfig as RuntimeRealtimeConfig,
60};
61#[cfg(feature = "gateway")]
62use forge_runtime::webhook::{WebhookRegistry, WebhookState, webhook_handler};
63#[cfg(feature = "workflows")]
64use forge_runtime::workflow::{
65 EventStore, WorkflowExecutor, WorkflowRegistry, WorkflowScheduler, WorkflowSchedulerConfig,
66};
67#[cfg(feature = "workflows")]
68use tokio_util::sync::CancellationToken;
69
70use builder::{config_role_to_node_role, get_hostname};
71
72#[cfg(feature = "gateway")]
74pub type FrontendHandler = fn(Request<Body>) -> Pin<Box<dyn Future<Output = Response> + Send>>;
75
76pub mod prelude {
85 pub use chrono::{DateTime, Utc};
86 pub use uuid::Uuid;
87
88 pub use serde::{Deserialize, Serialize};
89 pub use serde_json;
90 pub use serde_json::Value;
91
92 pub type Timestamp = DateTime<Utc>;
93
94 pub use forge_core::auth::TokenPair;
95 pub use forge_core::config::ForgeConfig;
96 pub use forge_core::cron::{CronContext, ForgeCron};
97 pub use forge_core::daemon::{DaemonContext, ForgeDaemon};
98 pub use forge_core::env::EnvAccess;
101 pub use forge_core::error::{ForgeError, Result};
102 pub use forge_core::function::{
103 AuthContext, DbConn, ForgeMutation, ForgeQuery, MutationContext, QueryContext,
104 };
105 pub use forge_core::job::{ForgeJob, JobContext, JobPriority};
106 pub use forge_core::mcp::{ForgeMcpTool, McpToolContext};
107 pub use forge_core::realtime::Delta;
108 pub use forge_core::schemars::JsonSchema;
109 pub use forge_core::types::Upload;
110 pub use forge_core::webhook::{ForgeWebhook, WebhookContext, WebhookResult, WebhookSignature};
111 pub use forge_core::workflow::{ForgeWorkflow, WorkflowContext};
112
113 #[cfg(feature = "gateway")]
115 pub use axum;
116
117 pub use crate::{Forge, ForgeBuilder};
118
119 pub use forge_core::testing::{
120 TestCronContext, TestDaemonContext, TestJobContext, TestMcpToolContext,
121 TestMutationContext, TestQueryContext, TestWebhookContext, TestWorkflowContext,
122 };
123}
124
125pub struct Forge {
127 pub(super) config: ForgeConfig,
128 pub(super) db: Option<Database>,
129 pub(super) node_id: NodeId,
130 pub(super) function_registry: FunctionRegistry,
131 #[cfg(feature = "gateway")]
132 pub(super) mcp_registry: McpToolRegistry,
133 #[cfg(feature = "jobs")]
134 pub(super) job_registry: JobRegistry,
135 #[cfg(feature = "cron")]
136 pub(super) cron_registry: Arc<CronRegistry>,
137 #[cfg(feature = "workflows")]
138 pub(super) workflow_registry: WorkflowRegistry,
139 #[cfg(feature = "daemons")]
140 pub(super) daemon_registry: Arc<DaemonRegistry>,
141 #[cfg(feature = "gateway")]
142 pub(super) webhook_registry: Arc<WebhookRegistry>,
143 pub(super) shutdown_tx: broadcast::Sender<()>,
144 pub(super) migrations_dir: PathBuf,
145 pub(super) extra_migrations: Vec<Migration>,
146 #[cfg(feature = "gateway")]
147 pub(super) frontend_handler: Option<FrontendHandler>,
148 #[cfg(feature = "gateway")]
149 pub(super) custom_routes_factory: Option<Box<dyn FnOnce(sqlx::PgPool) -> Router + Send + Sync>>,
150 #[cfg(feature = "gateway")]
151 pub(super) role_resolver: Option<forge_core::SharedRoleResolver>,
152}
153
154impl Forge {
155 pub fn builder() -> ForgeBuilder {
156 ForgeBuilder::new()
157 }
158
159 pub fn node_id(&self) -> NodeId {
160 self.node_id
161 }
162
163 pub fn config(&self) -> &ForgeConfig {
164 &self.config
165 }
166
167 pub fn function_registry(&self) -> &FunctionRegistry {
168 &self.function_registry
169 }
170
171 pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
172 &mut self.function_registry
173 }
174
175 #[cfg(feature = "gateway")]
176 pub fn mcp_registry_mut(&mut self) -> &mut McpToolRegistry {
177 &mut self.mcp_registry
178 }
179
180 #[cfg(feature = "gateway")]
181 pub fn register_mcp_tool<T: ForgeMcpTool>(&mut self) -> &mut Self {
182 self.mcp_registry.register::<T>();
183 self
184 }
185
186 #[cfg(feature = "jobs")]
187 pub fn job_registry(&self) -> &JobRegistry {
188 &self.job_registry
189 }
190
191 #[cfg(feature = "jobs")]
192 pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
193 &mut self.job_registry
194 }
195
196 #[cfg(feature = "cron")]
197 pub fn cron_registry(&self) -> Arc<CronRegistry> {
198 self.cron_registry.clone()
199 }
200
201 #[cfg(feature = "workflows")]
202 pub fn workflow_registry(&self) -> &WorkflowRegistry {
203 &self.workflow_registry
204 }
205
206 #[cfg(feature = "workflows")]
207 pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
208 &mut self.workflow_registry
209 }
210
211 #[cfg(feature = "daemons")]
212 pub fn daemon_registry(&self) -> Arc<DaemonRegistry> {
213 self.daemon_registry.clone()
214 }
215
216 #[cfg(feature = "gateway")]
217 pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
218 self.webhook_registry.clone()
219 }
220
221 #[cfg(feature = "workflows")]
224 async fn persist_workflow_definitions(&self, pool: &sqlx::PgPool) -> Result<()> {
225 self.workflow_registry.persist_definitions(pool).await
226 }
227
228 pub async fn run(mut self) -> Result<()> {
230 let telemetry_config = forge_runtime::TelemetryConfig::from_observability_config(
231 &self.config.observability,
232 &self.config.project.name,
233 &self.config.project.version,
234 );
235 let telemetry_result = forge_runtime::init_telemetry(
236 &telemetry_config,
237 &self.config.project.name,
238 &self.config.observability.log_level,
239 );
240 match &telemetry_result {
241 Ok(true) | Ok(false) => {
242 tracing::debug!(
243 endpoint = %telemetry_config.otlp_endpoint,
244 traces = telemetry_config.enable_traces,
245 metrics = telemetry_config.enable_metrics,
246 logs = telemetry_config.enable_logs,
247 sampling = telemetry_config.sampling_ratio,
248 "Telemetry initialized"
249 );
250 }
251 Err(e) => eprintln!("forge: failed to initialize telemetry: {e}"),
254 }
255
256 tracing::debug!("Connecting to database");
257
258 let db =
259 Database::from_config_with_service(&self.config.database, &self.config.project.name)
260 .await?;
261 let pool = db.primary().clone();
262 let _ = db.start_health_monitor(self.shutdown_tx.subscribe());
264 self.db = Some(db);
265
266 tracing::debug!("Database connected");
267
268 let runner = MigrationRunner::new(pool.clone());
269
270 let mut user_migrations = load_migrations_from_dir(&self.migrations_dir)?;
271 user_migrations.extend(self.extra_migrations.clone());
272
273 runner.run(user_migrations).await?;
274 tracing::debug!("Migrations applied");
275
276 #[cfg(feature = "workflows")]
277 if !self.workflow_registry.is_empty() {
278 self.persist_workflow_definitions(&pool).await?;
279 }
280
281 let hostname = get_hostname();
282
283 let ip_address: IpAddr = std::env::var("HOST")
285 .unwrap_or_else(|_| "0.0.0.0".to_string())
286 .parse()
287 .unwrap_or_else(|_| "0.0.0.0".parse().expect("valid IP literal"));
288
289 if let Ok(port_str) = std::env::var("PORT")
290 && let Ok(port) = port_str.parse::<u16>()
291 {
292 self.config.gateway.port = port;
293 }
294
295 let roles: Vec<NodeRole> = self
296 .config
297 .node
298 .roles
299 .iter()
300 .map(config_role_to_node_role)
301 .collect();
302
303 let node_info = NodeInfo::new_local(
304 hostname,
305 ip_address,
306 self.config.gateway.port,
307 self.config.gateway.grpc_port,
308 roles.clone(),
309 self.config.node.worker_capabilities.clone(),
310 env!("CARGO_PKG_VERSION").to_string(),
311 );
312
313 let node_id = node_info.id;
314 self.node_id = node_id;
315
316 let node_registry = Arc::new(NodeRegistry::new(pool.clone(), node_info));
317
318 if let Err(e) = node_registry.register().await {
319 tracing::debug!("Failed to register node (tables may not exist): {}", e);
320 }
321
322 if let Err(e) = node_registry.set_status(NodeStatus::Active).await {
323 tracing::debug!("Failed to set node status: {}", e);
324 }
325
326 let notify_bus = Arc::new(PgNotifyBus::new(
330 pool.clone(),
331 &[
332 "forge_changes",
333 "forge_jobs_available",
334 "forge_workflow_wakeup",
335 forge_runtime::pg::LEADER_RELEASED_CHANNEL,
336 ],
337 ));
338
339 let leader_election = if roles.contains(&NodeRole::Scheduler) {
340 let election = Arc::new(
341 LeaderElection::new(
342 pool.clone(),
343 node_id,
344 LeaderRole::Scheduler,
345 LeaderConfig::default(),
346 )
347 .with_notify_bus(notify_bus.clone()),
348 );
349
350 if let Err(e) = election.try_become_leader().await {
352 tracing::debug!("Failed to acquire leadership: {}", e);
353 }
354
355 Some(election)
356 } else {
357 None
358 };
359
360 let shutdown = Arc::new(GracefulShutdown::new(
361 node_registry.clone(),
362 leader_election.clone(),
363 ShutdownConfig::default(),
364 ));
365
366 #[cfg(any(feature = "cron", feature = "daemons", feature = "workflows"))]
367 let http_client = CircuitBreakerClient::with_ssrf_protection();
368
369 let mut handles = Vec::new();
370 let mut leader_handles: Vec<tokio::task::JoinHandle<()>> = Vec::new();
373
374 {
375 let heartbeat_pool = pool.clone();
376 let heartbeat_node_id = node_id;
377 let config = HeartbeatConfig::from_cluster_config(&self.config.cluster);
378 handles.push(tokio::spawn(async move {
379 match HeartbeatLoop::new(heartbeat_pool, heartbeat_node_id, config).await {
380 Ok(heartbeat) => heartbeat.run().await,
381 Err(e) => tracing::error!(error = %e, "Failed to start heartbeat loop"),
382 }
383 }));
384 }
385
386 if let Some(ref election) = leader_election {
387 let election = election.clone();
388 handles.push(tokio::spawn(async move {
389 election.run().await;
390 }));
391 }
392
393 #[cfg(feature = "cron")]
394 {
395 forge_runtime::cron::register_cron_bridges(&self.cron_registry, &mut self.job_registry);
396 }
397
398 #[cfg(feature = "jobs")]
399 let job_queue = JobQueue::new(pool.clone());
400
401 #[cfg(feature = "gateway")]
405 let notify_bus_needs_direct_spawn = !roles.contains(&NodeRole::Gateway);
406 #[cfg(not(feature = "gateway"))]
407 let notify_bus_needs_direct_spawn = true;
408 if notify_bus_needs_direct_spawn {
409 let (bus_shutdown_tx, bus_shutdown_rx) = tokio::sync::watch::channel(false);
410 let bus_for_task = notify_bus.clone();
411 handles.push(tokio::spawn(async move {
412 bus_for_task.run(bus_shutdown_rx).await;
413 }));
414 let mut bus_broadcast_rx = self.shutdown_tx.subscribe();
415 tokio::spawn(async move {
416 let _ = bus_broadcast_rx.recv().await;
417 let _ = bus_shutdown_tx.send(true);
418 });
419 }
420
421 let kv_handle: Arc<dyn forge_core::function::KvHandle> =
422 Arc::new(forge_runtime::KvStore::new(pool.clone(), "handlers"));
423
424 #[cfg(feature = "workflows")]
428 let workflow_bridge_executor = Arc::new(
429 WorkflowExecutor::new(
430 Arc::new(self.workflow_registry.clone()),
431 pool.clone(),
432 job_queue.clone(),
433 http_client.clone(),
434 )
435 .with_kv(Arc::clone(&kv_handle)),
436 );
437 #[cfg(feature = "workflows")]
438 {
439 forge_runtime::workflow::register_workflow_bridge(
440 workflow_bridge_executor.clone(),
441 &mut self.job_registry,
442 );
443 }
444
445 #[cfg(feature = "jobs")]
449 let job_dispatcher = {
450 let job_queue_for_dispatch = JobQueue::new(pool.clone());
451 Arc::new(JobDispatcher::new(
452 job_queue_for_dispatch,
453 self.job_registry.clone(),
454 ))
455 };
456
457 #[cfg(feature = "jobs")]
460 if roles.contains(&NodeRole::Worker) {
461 let mut node_capabilities: Vec<String> = self.config.node.worker_capabilities.clone();
462 for queue_name in self.config.worker.queues.keys() {
463 if !node_capabilities.iter().any(|c| c == queue_name) {
464 node_capabilities.push(queue_name.clone());
465 }
466 }
467
468 for (queue_name, queue_cfg) in &self.config.worker.queues {
469 if queue_cfg.workers == 0 {
470 continue;
471 }
472 let worker_id = Uuid::new_v4();
473 let claim_untagged = queue_name == forge_core::config::DEFAULT_QUEUE;
474 let worker_config = WorkerConfig {
475 id: Some(worker_id),
476 capabilities: vec![queue_name.clone()],
477 claim_untagged,
478 max_concurrent: queue_cfg.workers,
479 poll_interval: *self.config.worker.poll_interval,
480 ..Default::default()
481 };
482
483 let worker_base = Worker::new(
484 worker_config,
485 job_queue.clone(),
486 self.job_registry.clone(),
487 pool.clone(),
488 notify_bus.clone(),
489 )
490 .with_kv(Arc::clone(&kv_handle))
491 .with_job_dispatch(job_dispatcher.clone());
492
493 #[cfg(feature = "workflows")]
494 let mut worker =
495 worker_base.with_workflow_dispatch(workflow_bridge_executor.clone());
496 #[cfg(not(feature = "workflows"))]
497 let mut worker = worker_base;
498
499 let queue_label = queue_name.clone();
500 handles.push(tokio::spawn(async move {
501 if let Err(e) = worker.run().await {
502 tracing::error!(queue = %queue_label, "Worker error: {}", e);
503 }
504 }));
505
506 tracing::debug!(
507 queue = %queue_name,
508 workers = queue_cfg.workers,
509 "Job worker pool started",
510 );
511 }
512
513 let total_worker_concurrency: usize =
517 self.config.worker.queues.values().map(|q| q.workers).sum();
518 const PERSISTENT_CONN_OVERHEAD: usize = 6;
519 let min_recommended = total_worker_concurrency + PERSISTENT_CONN_OVERHEAD;
520 if (self.config.database.pool_size as usize) < min_recommended {
521 tracing::warn!(
522 pool_size = self.config.database.pool_size,
523 total_worker_concurrency,
524 min_recommended,
525 "database.pool_size ({}) is below the recommended minimum ({}) for the \
526 configured worker concurrency. \
527 Formula: sum(workers per queue) + 6 = {} + 6 = {}. \
528 Increase database.pool_size to avoid connection exhaustion under load.",
529 self.config.database.pool_size,
530 min_recommended,
531 total_worker_concurrency,
532 min_recommended,
533 );
534 }
535 }
536
537 #[cfg(feature = "jobs")]
538 if roles.contains(&NodeRole::Worker) {
539 let kv_pool = pool.clone();
540 let mut kv_shutdown = self.shutdown_tx.subscribe();
541 let kv_leader = leader_election.clone();
542 handles.push(tokio::spawn(async move {
543 let kv = forge_runtime::KvStore::new(kv_pool.clone(), "app");
544 let rate_limiter = forge_runtime::StrictRateLimiter::new(kv_pool);
545 loop {
546 tokio::select! {
547 _ = kv_shutdown.recv() => break,
548 _ = tokio::time::sleep(Duration::from_secs(300)) => {}
549 }
550 let is_leader = kv_leader.as_ref().map(|e| e.is_leader()).unwrap_or(true);
551 if !is_leader {
552 continue;
553 }
554 match kv.cleanup_expired().await {
555 Ok(n) if n > 0 => tracing::debug!(count = n, "KV TTL cleanup"),
556 Err(e) => tracing::warn!(error = %e, "KV TTL cleanup failed"),
557 _ => {}
558 }
559 let cutoff = chrono::Utc::now() - chrono::Duration::hours(24);
560 match rate_limiter.cleanup(cutoff).await {
561 Ok(n) if n > 0 => tracing::debug!(count = n, "Rate limit bucket cleanup"),
562 Err(e) => tracing::warn!(error = %e, "Rate limit cleanup failed"),
563 _ => {}
564 }
565 }
566 }));
567 }
568
569 #[cfg(feature = "cron")]
570 let cron_runner_handle: Option<Arc<CronRunner>> = if roles.contains(&NodeRole::Scheduler) {
571 let cron_registry = self.cron_registry.clone();
572 let cron_pool = pool.clone();
573 let cron_leader_election = leader_election.clone();
574
575 let cron_config = CronRunnerConfig {
576 poll_interval: *self.config.cron.poll_interval,
577 node_id: node_id.as_uuid(),
578 is_leader: cron_leader_election.is_none(),
579 leader_election: cron_leader_election,
580 run_stale_threshold: Duration::from_secs(15 * 60),
581 ..Default::default()
582 };
583
584 let cron_runner = Arc::new(CronRunner::new(
585 cron_registry,
586 cron_pool,
587 job_queue.clone(),
588 cron_config,
589 ));
590 let cron_runner_clone = cron_runner.clone();
591
592 leader_handles.push(tokio::spawn(async move {
593 if let Err(e) = cron_runner_clone.run().await {
594 tracing::error!("Cron runner error: {}", e);
595 }
596 }));
597
598 tracing::debug!("Cron scheduler started");
599 Some(cron_runner)
600 } else {
601 None
602 };
603
604 #[cfg(feature = "workflows")]
605 let workflow_shutdown_token = CancellationToken::new();
606 #[cfg(feature = "workflows")]
607 if roles.contains(&NodeRole::Scheduler) {
608 let event_store = Arc::new(EventStore::new(pool.clone()));
609 let scheduler = WorkflowScheduler::new(
610 pool.clone(),
611 job_queue.clone(),
612 event_store,
613 WorkflowSchedulerConfig {
614 poll_interval: *self.config.workflow.poll_interval,
615 leader_election: leader_election.clone(),
616 ..WorkflowSchedulerConfig::default()
617 },
618 notify_bus.clone(),
619 );
620
621 let shutdown_token = workflow_shutdown_token.clone();
622 leader_handles.push(tokio::spawn(async move {
623 scheduler.run(shutdown_token).await;
624 }));
625
626 tracing::debug!("Workflow scheduler started");
627 }
628
629 #[cfg(feature = "workflows")]
630 let workflow_executor = workflow_bridge_executor;
631
632 #[cfg(feature = "daemons")]
633 if roles.contains(&NodeRole::Scheduler) && !self.daemon_registry.is_empty() {
634 let daemon_registry = self.daemon_registry.clone();
635 let daemon_pool = pool.clone();
636 let daemon_http = http_client.clone();
637 let daemon_shutdown_rx = self.shutdown_tx.subscribe();
638
639 let daemon_runner = DaemonRunner::new(
640 daemon_registry,
641 daemon_pool,
642 daemon_http,
643 node_id.as_uuid(),
644 daemon_shutdown_rx,
645 )
646 .with_config(forge_runtime::daemon::DaemonRunnerConfig {
647 health_check_interval: *self.config.daemon.health_check_interval,
648 heartbeat_interval: *self.config.daemon.heartbeat_interval,
649 });
650 #[cfg(feature = "jobs")]
651 let daemon_runner = daemon_runner.with_job_dispatch(job_dispatcher.clone());
652 #[cfg(feature = "workflows")]
653 let daemon_runner = daemon_runner.with_workflow_dispatch(workflow_executor.clone());
654 let daemon_runner = daemon_runner.with_kv(Arc::clone(&kv_handle));
655
656 leader_handles.push(tokio::spawn(async move {
657 if let Err(e) = daemon_runner.run().await {
658 tracing::error!("Daemon runner error: {}", e);
659 }
660 }));
661
662 tracing::debug!("Daemon runner started");
663 }
664
665 #[cfg(feature = "gateway")]
666 let mut reactor_handle = None;
667
668 #[cfg(feature = "gateway")]
669 if roles.contains(&NodeRole::Gateway) {
670 let tls: Option<TlsListenConfig> =
674 TlsListenConfig::from_core(&self.config.gateway.tls)?;
675
676 let any_requires_auth = self
679 .function_registry
680 .queries()
681 .any(|(_, info)| !info.is_public || info.required_role.is_some())
682 || self
683 .function_registry
684 .mutations()
685 .any(|(_, info)| !info.is_public || info.required_role.is_some());
686
687 if any_requires_auth && !self.config.auth.is_configured() {
688 return Err(ForgeError::config(
689 "One or more handlers require authentication (private scope or require_role) \
690 but auth is not configured. Set auth.jwt_secret (≥32 bytes) for HMAC or \
691 auth.jwks_url for external identity providers.",
692 ));
693 }
694
695 if self.config.gateway.cors_enabled
698 && self.config.gateway.cors_origins.iter().any(|o| o == "*")
699 {
700 let forge_env = std::env::var("FORGE_ENV").ok();
701 let is_dev = forge_env
702 .as_deref()
703 .is_some_and(|v| v.eq_ignore_ascii_case("development"));
704 if !is_dev {
705 let production_indicators = [
706 ("FORGE_ENV", std::env::var("FORGE_ENV").ok()),
707 ("NODE_ENV", std::env::var("NODE_ENV").ok()),
708 (
709 "RAILWAY_ENVIRONMENT",
710 std::env::var("RAILWAY_ENVIRONMENT").ok(),
711 ),
712 ("K_SERVICE", std::env::var("K_SERVICE").ok()),
713 ("FLY_APP_NAME", std::env::var("FLY_APP_NAME").ok()),
714 (
715 "KUBERNETES_SERVICE_HOST",
716 std::env::var("KUBERNETES_SERVICE_HOST").ok(),
717 ),
718 ("AWS_EXECUTION_ENV", std::env::var("AWS_EXECUTION_ENV").ok()),
719 ];
720 let hint = production_indicators
721 .iter()
722 .find_map(|(name, val)| {
723 val.as_ref().map(|v| format!(" ({name}={v} detected)"))
724 })
725 .unwrap_or_default();
726 return Err(ForgeError::config(format!(
727 "gateway.cors_origins = [\"*\"] is only allowed when FORGE_ENV=development{hint}. \
728 Set explicit origins (e.g. cors_origins = [\"https://yourdomain.com\"])."
729 )));
730 }
731 }
732
733 let gateway_config = RuntimeGatewayConfig {
734 port: self.config.gateway.port,
735 max_connections: self.config.gateway.max_connections,
736 sse_max_sessions: self.config.realtime.sse_max_sessions,
737 request_timeout_secs: self.config.gateway.request_timeout.as_secs(),
738 cors_enabled: self.config.gateway.cors_enabled,
739 cors_origins: self.config.gateway.cors_origins.clone(),
740 auth: AuthConfig::from_forge_config(&self.config.auth)
741 .map_err(|e| ForgeError::config(e.to_string()))?,
742 mcp: self.config.mcp.clone(),
743 quiet_paths: self.config.gateway.quiet_paths.clone(),
744 max_body_size_bytes: self.config.gateway.max_body_size.as_bytes(),
745 max_json_body_bytes: self.config.gateway.max_json_body_size.as_bytes(),
746 max_file_size_bytes: self.config.gateway.max_file_size.as_bytes(),
747 token_ttl: forge_core::AuthTokenTtl::new(
748 self.config.auth.access_token_ttl_secs(),
749 self.config.auth.refresh_token_ttl_days(),
750 ),
751 project_name: self.config.project.name.clone(),
752 tls,
753 reactor_config: {
754 let rt = &self.config.realtime;
755 ReactorConfig {
756 listener: ListenerConfig {
757 buffer_size: rt.postgres_change_buffer_size,
758 ..ListenerConfig::default()
759 },
760 invalidation: InvalidationConfig {
761 debounce_ms: rt.debounce_quiet_window.as_millis(),
762 max_debounce_ms: rt.debounce_max_wait.as_millis(),
763 ..InvalidationConfig::default()
764 },
765 realtime: RuntimeRealtimeConfig {
766 max_subscriptions_per_session: rt.subscription_max_per_session,
767 },
768 max_concurrent_reexecutions: rt.max_concurrent_reexecutions,
769 resync_interval_secs: rt.resync_interval.as_secs(),
770 shard_count: rt.shard_count,
771 ..ReactorConfig::default()
772 }
773 },
774 max_multipart_fields: self.config.gateway.max_multipart_fields,
775 max_sessions_per_user: self.config.realtime.max_sessions_per_user,
776 max_sessions_per_ip: self.config.realtime.max_sessions_per_ip,
777 max_subscriptions_per_user: self.config.realtime.max_subscriptions_per_user,
778 security_headers: self.config.gateway.security_headers,
779 hsts: self.config.gateway.hsts,
780 trusted_proxies: self
781 .config
782 .gateway
783 .trusted_proxies
784 .iter()
785 .filter_map(|s| {
786 s.parse::<ipnet::IpNet>()
787 .or_else(|_| s.parse::<std::net::IpAddr>().map(ipnet::IpNet::from))
788 .ok()
789 })
790 .collect(),
791 max_jobs_per_request: self.config.gateway.max_jobs_per_request,
792 max_result_size_bytes: self.config.gateway.max_result_size_bytes,
793 max_json_depth: self.config.gateway.max_json_depth,
794 };
795
796 let db_ref = self
797 .db
798 .clone()
799 .ok_or_else(|| ForgeError::internal("Database not initialized"))?;
800
801 let gateway = GatewayServer::new(
802 gateway_config,
803 self.function_registry.clone(),
804 db_ref.clone(),
805 notify_bus.clone(),
806 )
807 .with_node_id(self.node_id);
808 #[cfg(feature = "jobs")]
809 let gateway = gateway.with_job_dispatcher(job_dispatcher.clone());
810 #[cfg(feature = "workflows")]
811 let gateway = gateway.with_workflow_dispatcher(workflow_executor.clone());
812 let gateway = gateway.with_kv(Arc::clone(&kv_handle));
813 let mut gateway = gateway.with_mcp_registry(self.mcp_registry.clone());
814
815 if matches!(
816 self.config.rate_limit.mode,
817 forge_core::config::RateLimitMode::Hybrid
818 ) {
819 #[allow(clippy::disallowed_methods)]
821 let active_nodes: Option<i64> =
822 sqlx::query_scalar("SELECT COUNT(*) FROM forge_nodes WHERE status = 'active'")
823 .fetch_one(db_ref.primary())
824 .await
825 .ok();
826 if active_nodes.is_some_and(|n| n > 1) {
827 let n = active_nodes.unwrap_or(0);
828 tracing::warn!(
829 active_nodes = n,
830 "rate_limit.mode is 'hybrid' but {n} active nodes detected. \
831 Per-user/per-IP limits are local-only and effectively multiply by the \
832 node count. Set rate_limit.mode = \"strict\" for cluster deployments."
833 );
834 }
835 }
836
837 let rate_limiter: std::sync::Arc<dyn forge_core::rate_limit::RateLimiterBackend> =
838 match self.config.rate_limit.mode {
839 forge_core::config::RateLimitMode::Strict => std::sync::Arc::new(
840 forge_runtime::StrictRateLimiter::new(db_ref.primary().clone()),
841 ),
842 forge_core::config::RateLimitMode::Hybrid => {
843 std::sync::Arc::new(forge_runtime::HybridRateLimiter::with_max_buckets(
844 db_ref.primary().clone(),
845 self.config.rate_limit.max_local_buckets,
846 ))
847 }
848 };
849 gateway = gateway.with_rate_limiter(rate_limiter);
850 if let Some(resolver) = self.role_resolver.take() {
851 gateway = gateway.with_role_resolver(resolver);
852 }
853 if self.config.signals.enabled {
854 let signals_pool = std::sync::Arc::new(db_ref.primary().clone());
855 let collector = forge_runtime::signals::SignalsCollector::spawn(
856 signals_pool.clone(),
857 self.config.signals.batch_size,
858 *self.config.signals.flush_interval,
859 self.config.signals.channel_capacity,
860 );
861 let geoip = match &self.config.signals.geoip_db_path {
864 Some(path) => {
865 let resolver = forge_runtime::signals::geoip::GeoIpResolver::from_mmdb(
866 std::path::Path::new(path),
867 )?;
868 tracing::info!(path, "GeoIP: MaxMind MMDB loaded (city-level)");
869 resolver
870 }
871 None => forge_runtime::signals::geoip::GeoIpResolver::new(),
872 };
873 gateway = gateway
874 .with_signals_collector(collector)
875 .with_signals_anonymize_ip(self.config.signals.anonymize_ip)
876 .with_signals_geoip(geoip);
877
878 forge_runtime::signals::session::spawn_session_reaper(
879 signals_pool.clone(),
880 (self.config.signals.session_timeout.as_secs() / 60) as u32,
881 );
882
883 forge_runtime::signals::partition::ensure_partitions(&signals_pool).await;
884
885 {
886 let partition_pool = signals_pool.clone();
887 let retention_days = self.config.signals.retention_days;
888 let partition_leader = leader_election.clone();
889 let mut partition_shutdown = self.shutdown_tx.subscribe();
890 handles.push(tokio::spawn(async move {
891 loop {
892 tokio::select! {
893 _ = partition_shutdown.recv() => break,
894 _ = tokio::time::sleep(Duration::from_secs(21_600)) => {}
895 }
896 let is_leader = partition_leader
897 .as_ref()
898 .map(|e| e.is_leader())
899 .unwrap_or(true);
900 if is_leader {
901 forge_runtime::signals::partition::ensure_partitions(
902 &partition_pool,
903 )
904 .await;
905 forge_runtime::signals::partition::drop_old_partitions(
906 &partition_pool,
907 retention_days,
908 )
909 .await;
910 forge_runtime::signals::partition::check_default_partition(
911 &partition_pool,
912 )
913 .await;
914 }
915 }
916 }));
917 }
918
919 tracing::info!("Signals enabled (analytics + diagnostics)");
920 }
921
922 let mut custom_routes: Option<Router> = self
927 .custom_routes_factory
928 .take()
929 .map(|factory| factory(pool.clone()));
930
931 if !self.webhook_registry.is_empty() {
932 use axum::extract::DefaultBodyLimit;
933 use axum::routing::post;
934
935 let webhook_state = WebhookState::new(self.webhook_registry.clone(), pool.clone());
936 #[cfg(feature = "jobs")]
937 let webhook_state = webhook_state.with_job_dispatcher(job_dispatcher.clone());
938 #[cfg(feature = "workflows")]
939 let webhook_state =
940 webhook_state.with_workflow_dispatcher(workflow_executor.clone());
941 let webhook_state = webhook_state.with_kv(Arc::clone(&kv_handle));
942 let webhook_state = Arc::new(webhook_state);
943
944 let webhook_routes = Router::new()
945 .route(
946 "/webhooks/{*path}",
947 post(webhook_handler).with_state(webhook_state),
948 )
949 .layer(DefaultBodyLimit::max(1024 * 1024));
950
951 custom_routes = Some(match custom_routes {
952 Some(existing) => existing.merge(webhook_routes),
953 None => webhook_routes,
954 });
955
956 tracing::debug!(
957 webhooks = ?self.webhook_registry.paths().collect::<Vec<_>>(),
958 "Webhook routes registered"
959 );
960 }
961
962 if let Some(routes) = custom_routes {
963 gateway = gateway.with_custom_routes(routes);
964 tracing::debug!("Custom and webhook routes merged into gateway middleware stack");
965 }
966
967 let reactor = gateway.reactor();
968 if let Err(e) = reactor.start().await {
969 tracing::error!("Failed to start reactor: {}", e);
970 } else {
971 tracing::debug!("Reactor started");
972 reactor_handle = Some(reactor);
973 }
974
975 let api_router = gateway.router();
976 let mut router = Router::new().nest("/_api", api_router);
977
978 if self.config.mcp.enabled {
979 use axum::routing::get;
980
981 async fn oauth_not_supported() -> impl axum::response::IntoResponse {
984 (
985 axum::http::StatusCode::NOT_FOUND,
986 axum::Json(serde_json::json!({
987 "error": "oauth_not_supported",
988 "error_description": "This server does not support OAuth. Connect without authentication."
989 })),
990 )
991 }
992
993 #[cfg(feature = "mcp-oauth")]
994 if let Some((oauth_api_router, oauth_state)) = gateway.oauth_router() {
995 router = router.nest("/_api", oauth_api_router);
996 router = router
997 .route(
998 "/.well-known/oauth-authorization-server",
999 get(forge_runtime::gateway::oauth::well_known_oauth_metadata)
1000 .with_state(oauth_state.clone()),
1001 )
1002 .route(
1003 "/.well-known/oauth-protected-resource",
1004 get(forge_runtime::gateway::oauth::well_known_resource_metadata)
1005 .with_state(oauth_state),
1006 );
1007
1008 tracing::info!("OAuth 2.1 endpoints enabled for MCP");
1009 } else {
1010 router = router
1011 .route(
1012 "/.well-known/oauth-authorization-server",
1013 get(oauth_not_supported),
1014 )
1015 .route(
1016 "/.well-known/oauth-protected-resource",
1017 get(oauth_not_supported),
1018 );
1019 }
1020
1021 #[cfg(not(feature = "mcp-oauth"))]
1022 {
1023 router = router
1024 .route(
1025 "/.well-known/oauth-authorization-server",
1026 get(oauth_not_supported),
1027 )
1028 .route(
1029 "/.well-known/oauth-protected-resource",
1030 get(oauth_not_supported),
1031 );
1032 }
1033 }
1034
1035 if let Some(handler) = self.frontend_handler {
1036 use axum::routing::get;
1037 router = router.fallback(get(handler));
1038 tracing::debug!("Frontend handler enabled");
1039 }
1040
1041 let addr = gateway.addr();
1042 let tls = gateway.tls().cloned();
1043 let mut gateway_shutdown_rx = shutdown.subscribe();
1047
1048 handles.push(tokio::spawn(async move {
1049 tracing::debug!(addr = %addr, "Gateway server binding");
1050 let listener = match bind_listener(addr, tls.as_ref()).await {
1051 Ok(l) => l,
1052 Err(e) => {
1053 tracing::error!(error = %e, "Failed to bind gateway listener");
1054 return;
1055 }
1056 };
1057 let serve = axum::serve(listener, router).with_graceful_shutdown(async move {
1058 let _ = gateway_shutdown_rx.wait_for(|v| *v).await;
1059 tracing::debug!("Gateway draining in-flight requests");
1060 });
1061 if let Err(e) = serve.await {
1062 tracing::error!("Gateway server error: {}", e);
1063 }
1064 }));
1065 }
1066
1067 #[cfg(feature = "jobs")]
1068 let jobs_count = self.job_registry.len();
1069 #[cfg(not(feature = "jobs"))]
1070 let jobs_count: usize = 0;
1071 #[cfg(feature = "cron")]
1072 let crons_count = self.cron_registry.len();
1073 #[cfg(not(feature = "cron"))]
1074 let crons_count: usize = 0;
1075 #[cfg(feature = "workflows")]
1076 let workflows_count = self.workflow_registry.len();
1077 #[cfg(not(feature = "workflows"))]
1078 let workflows_count: usize = 0;
1079 #[cfg(feature = "daemons")]
1080 let daemons_count = self.daemon_registry.len();
1081 #[cfg(not(feature = "daemons"))]
1082 let daemons_count: usize = 0;
1083 #[cfg(feature = "gateway")]
1084 let webhooks_count = self.webhook_registry.len();
1085 #[cfg(not(feature = "gateway"))]
1086 let webhooks_count: usize = 0;
1087 #[cfg(feature = "gateway")]
1088 let mcp_tools_count = self.mcp_registry.len();
1089 #[cfg(not(feature = "gateway"))]
1090 let mcp_tools_count: usize = 0;
1091
1092 tracing::info!(
1093 queries = self.function_registry.queries().count(),
1094 mutations = self.function_registry.mutations().count(),
1095 jobs = jobs_count,
1096 crons = crons_count,
1097 workflows = workflows_count,
1098 daemons = daemons_count,
1099 webhooks = webhooks_count,
1100 mcp_tools = mcp_tools_count,
1101 "Functions registered"
1102 );
1103
1104 {
1105 let pool = pool.clone();
1106 tokio::spawn(async move {
1107 loop {
1108 tokio::time::sleep(Duration::from_secs(15)).await;
1109 forge_runtime::observability::record_pool_metrics(&pool);
1110 }
1111 });
1112 }
1113
1114 let role_names: Vec<&str> = roles.iter().map(|r| r.as_str()).collect();
1115 let capabilities = &self.config.node.worker_capabilities;
1116 tracing::info!(
1117 node_id = %node_id,
1118 project = %self.config.project.name,
1119 version = env!("CARGO_PKG_VERSION"),
1120 roles = ?role_names,
1121 worker_capabilities = ?capabilities,
1122 port = self.config.gateway.port,
1123 db_pool_size = self.config.database.pool_size,
1124 cluster_discovery = ?self.config.cluster.discovery,
1125 observability = self.config.observability.enabled,
1126 mcp = self.config.mcp.enabled,
1127 "Forge started"
1128 );
1129
1130 let mut shutdown_rx = self.shutdown_tx.subscribe();
1131
1132 tokio::select! {
1133 _ = tokio::signal::ctrl_c() => {
1134 tracing::debug!("Received ctrl-c");
1135 }
1136 _ = shutdown_rx.recv() => {
1137 tracing::debug!("Received shutdown notification");
1138 }
1139 }
1140
1141 tracing::debug!("Graceful shutdown starting");
1142
1143 let _ = self.shutdown_tx.send(());
1145
1146 #[cfg(feature = "workflows")]
1148 workflow_shutdown_token.cancel();
1149
1150 #[cfg(feature = "cron")]
1151 if let Some(ref runner) = cron_runner_handle {
1152 runner.stop().await;
1153 }
1154
1155 tracing::debug!("Waiting for leader-held tasks to drain");
1158 for handle in leader_handles {
1159 let _ = handle.await;
1160 }
1161 tracing::debug!("Leader-held tasks drained");
1162
1163 if let Err(e) = shutdown.shutdown().await {
1165 tracing::warn!(error = %e, "Shutdown error");
1166 }
1167
1168 if let Some(ref election) = leader_election {
1169 election.stop();
1170 }
1171
1172 #[cfg(feature = "gateway")]
1173 if let Some(ref reactor) = reactor_handle {
1174 reactor.stop();
1175 }
1176
1177 if let Some(ref db) = self.db {
1178 db.close().await;
1179 }
1180
1181 forge_runtime::shutdown_telemetry();
1182 tracing::info!("Forge stopped");
1183 Ok(())
1184 }
1185
1186 pub fn shutdown(&self) {
1188 let _ = self.shutdown_tx.send(());
1189 }
1190}
1191
1192#[cfg(test)]
1193#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
1194mod tests {
1195 use super::*;
1196 use std::future::Future;
1197 use std::pin::Pin;
1198
1199 use forge_core::config::NodeRole as ConfigNodeRole;
1200 use forge_core::mcp::{McpToolAnnotations, McpToolInfo};
1201
1202 struct TestMcpTool;
1203
1204 impl forge_core::__sealed::Sealed for TestMcpTool {}
1205
1206 impl ForgeMcpTool for TestMcpTool {
1207 type Args = serde_json::Value;
1208 type Output = serde_json::Value;
1209
1210 fn info() -> McpToolInfo {
1211 McpToolInfo {
1212 name: "test.mcp.tool",
1213 title: None,
1214 description: None,
1215 required_role: None,
1216 is_public: false,
1217 timeout: None,
1218 rate_limit_requests: None,
1219 rate_limit_per_secs: None,
1220 rate_limit_key: None,
1221 annotations: McpToolAnnotations::default(),
1222 icons: &[],
1223 }
1224 }
1225
1226 fn execute(
1227 _ctx: &forge_core::McpToolContext,
1228 _args: Self::Args,
1229 ) -> Pin<Box<dyn Future<Output = forge_core::Result<Self::Output>> + Send + '_>> {
1230 Box::pin(async { Ok(serde_json::json!({ "ok": true })) })
1231 }
1232 }
1233
1234 #[test]
1235 fn test_forge_builder_new() {
1236 let builder = ForgeBuilder::new();
1237 assert!(builder.config.is_none());
1238 }
1239
1240 #[test]
1241 fn test_forge_builder_requires_config() {
1242 let builder = ForgeBuilder::new();
1243 let result = builder.build();
1244 assert!(result.is_err());
1245 }
1246
1247 #[test]
1248 fn test_forge_builder_with_config() {
1249 let config = ForgeConfig::default_with_database_url("postgres://localhost/test");
1250 let result = ForgeBuilder::new().config(config).build();
1251 assert!(result.is_ok());
1252 }
1253
1254 #[test]
1255 fn test_forge_builder_register_mcp_tool() {
1256 let builder = ForgeBuilder::new().register_mcp_tool::<TestMcpTool>();
1257 assert_eq!(builder.mcp_registry.len(), 1);
1258 }
1259
1260 #[test]
1261 fn test_config_role_conversion() {
1262 use builder::config_role_to_node_role;
1263 assert_eq!(
1264 config_role_to_node_role(&ConfigNodeRole::Gateway),
1265 NodeRole::Gateway
1266 );
1267 assert_eq!(
1268 config_role_to_node_role(&ConfigNodeRole::Worker),
1269 NodeRole::Worker
1270 );
1271 assert_eq!(
1272 config_role_to_node_role(&ConfigNodeRole::Scheduler),
1273 NodeRole::Scheduler
1274 );
1275 assert_eq!(
1276 config_role_to_node_role(&ConfigNodeRole::Function),
1277 NodeRole::Function
1278 );
1279 }
1280}