1mod builder;
4pub use builder::ForgeBuilder;
5
6#[cfg(feature = "gateway")]
7use std::future::Future;
8use std::net::IpAddr;
9use std::path::PathBuf;
10#[cfg(feature = "gateway")]
11use std::pin::Pin;
12use std::sync::Arc;
13use std::time::Duration;
14
15use uuid::Uuid;
16
17#[cfg(feature = "gateway")]
18use axum::Router;
19#[cfg(feature = "gateway")]
20use axum::body::Body;
21#[cfg(feature = "gateway")]
22use axum::http::Request;
23#[cfg(feature = "gateway")]
24use axum::response::Response;
25use tokio::sync::broadcast;
26
27use forge_core::cluster::{LeaderRole, NodeId, NodeInfo, NodeRole, NodeStatus};
28use forge_core::config::ForgeConfig;
29use forge_core::error::{ForgeError, Result};
30use forge_runtime::pg::migration::{Migration, MigrationRunner, load_migrations_from_dir};
31
32#[cfg(feature = "gateway")]
33use forge_core::mcp::ForgeMcpTool;
34use forge_runtime::cluster::{
35 GracefulShutdown, HeartbeatConfig, HeartbeatLoop, NodeRegistry, ShutdownConfig,
36};
37#[cfg(feature = "cron")]
38use forge_runtime::cron::{CronRegistry, CronRunner, CronRunnerConfig};
39#[cfg(feature = "daemons")]
40use forge_runtime::daemon::{DaemonRegistry, DaemonRunner};
41use forge_runtime::function::FunctionRegistry;
42use forge_runtime::pg::Database;
43use forge_runtime::pg::{LeaderConfig, LeaderElection, PgNotifyBus};
44#[cfg(any(feature = "cron", feature = "daemons", feature = "workflows"))]
47use forge_core::CircuitBreakerClient;
48#[cfg(feature = "gateway")]
49use forge_runtime::gateway::{
50 AuthConfig, GatewayConfig as RuntimeGatewayConfig, GatewayServer, TlsListenConfig,
51 bind_listener,
52};
53#[cfg(feature = "jobs")]
54use forge_runtime::jobs::{JobDispatcher, JobQueue, JobRegistry, Worker, WorkerConfig};
55#[cfg(feature = "gateway")]
56use forge_runtime::mcp::McpToolRegistry;
57#[cfg(feature = "gateway")]
58use forge_runtime::realtime::{
59 InvalidationConfig, ListenerConfig, ReactorConfig, RealtimeConfig as RuntimeRealtimeConfig,
60};
61#[cfg(feature = "gateway")]
62use forge_runtime::webhook::{WebhookRegistry, WebhookState, webhook_handler};
63#[cfg(feature = "workflows")]
64use forge_runtime::workflow::{
65 EventStore, WorkflowExecutor, WorkflowRegistry, WorkflowScheduler, WorkflowSchedulerConfig,
66};
67#[cfg(feature = "workflows")]
68use tokio_util::sync::CancellationToken;
69
70use builder::{config_role_to_node_role, get_hostname};
71
72#[cfg(feature = "gateway")]
74pub type FrontendHandler = fn(Request<Body>) -> Pin<Box<dyn Future<Output = Response> + Send>>;
75
76pub mod prelude {
85 pub use chrono::{DateTime, Utc};
86 pub use uuid::Uuid;
87
88 pub use serde::{Deserialize, Serialize};
89 pub use serde_json;
90 pub use serde_json::Value;
91
92 pub type Timestamp = DateTime<Utc>;
93
94 pub use forge_core::auth::TokenPair;
95 pub use forge_core::config::ForgeConfig;
96 pub use forge_core::cron::{CronContext, ForgeCron};
97 pub use forge_core::daemon::{DaemonContext, ForgeDaemon};
98 pub use forge_core::env::EnvAccess;
101 pub use forge_core::error::{ForgeError, Result};
102 pub use forge_core::function::{
103 AuthContext, DbConn, ForgeMutation, ForgeQuery, MutationContext, QueryContext,
104 };
105 pub use forge_core::job::{ForgeJob, JobContext, JobPriority};
106 pub use forge_core::mcp::{ForgeMcpTool, McpToolContext};
107 pub use forge_core::realtime::Delta;
108 pub use forge_core::schemars::JsonSchema;
109 pub use forge_core::types::Upload;
110 pub use forge_core::webhook::{ForgeWebhook, WebhookContext, WebhookResult, WebhookSignature};
111 pub use forge_core::workflow::{ForgeWorkflow, WorkflowContext};
112
113 #[cfg(feature = "gateway")]
115 pub use axum;
116
117 pub use crate::{Forge, ForgeBuilder};
118
119 pub use forge_core::testing::{
120 TestCronContext, TestDaemonContext, TestJobContext, TestMcpToolContext,
121 TestMutationContext, TestQueryContext, TestWebhookContext, TestWorkflowContext,
122 };
123}
124
125pub struct Forge {
127 pub(super) config: ForgeConfig,
128 pub(super) db: Option<Database>,
129 pub(super) node_id: NodeId,
130 pub(super) function_registry: FunctionRegistry,
131 #[cfg(feature = "gateway")]
132 pub(super) mcp_registry: McpToolRegistry,
133 #[cfg(feature = "jobs")]
134 pub(super) job_registry: JobRegistry,
135 #[cfg(feature = "cron")]
136 pub(super) cron_registry: Arc<CronRegistry>,
137 #[cfg(feature = "workflows")]
138 pub(super) workflow_registry: WorkflowRegistry,
139 #[cfg(feature = "daemons")]
140 pub(super) daemon_registry: Arc<DaemonRegistry>,
141 #[cfg(feature = "gateway")]
142 pub(super) webhook_registry: Arc<WebhookRegistry>,
143 pub(super) shutdown_tx: broadcast::Sender<()>,
144 pub(super) migrations_dir: PathBuf,
145 pub(super) extra_migrations: Vec<Migration>,
146 #[cfg(feature = "gateway")]
147 pub(super) frontend_handler: Option<FrontendHandler>,
148 #[cfg(feature = "gateway")]
149 pub(super) custom_routes_factory: Option<Box<dyn FnOnce(sqlx::PgPool) -> Router + Send + Sync>>,
150 #[cfg(feature = "gateway")]
151 pub(super) role_resolver: Option<forge_core::SharedRoleResolver>,
152}
153
154impl Forge {
155 pub fn builder() -> ForgeBuilder {
156 ForgeBuilder::new()
157 }
158
159 pub fn node_id(&self) -> NodeId {
160 self.node_id
161 }
162
163 pub fn config(&self) -> &ForgeConfig {
164 &self.config
165 }
166
167 pub fn function_registry(&self) -> &FunctionRegistry {
168 &self.function_registry
169 }
170
171 pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
172 &mut self.function_registry
173 }
174
175 #[cfg(feature = "gateway")]
176 pub fn mcp_registry_mut(&mut self) -> &mut McpToolRegistry {
177 &mut self.mcp_registry
178 }
179
180 #[cfg(feature = "gateway")]
181 pub fn register_mcp_tool<T: ForgeMcpTool>(&mut self) -> &mut Self {
182 self.mcp_registry.register::<T>();
183 self
184 }
185
186 #[cfg(feature = "jobs")]
187 pub fn job_registry(&self) -> &JobRegistry {
188 &self.job_registry
189 }
190
191 #[cfg(feature = "jobs")]
192 pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
193 &mut self.job_registry
194 }
195
196 #[cfg(feature = "cron")]
197 pub fn cron_registry(&self) -> Arc<CronRegistry> {
198 self.cron_registry.clone()
199 }
200
201 #[cfg(feature = "workflows")]
202 pub fn workflow_registry(&self) -> &WorkflowRegistry {
203 &self.workflow_registry
204 }
205
206 #[cfg(feature = "workflows")]
207 pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
208 &mut self.workflow_registry
209 }
210
211 #[cfg(feature = "daemons")]
212 pub fn daemon_registry(&self) -> Arc<DaemonRegistry> {
213 self.daemon_registry.clone()
214 }
215
216 #[cfg(feature = "gateway")]
217 pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
218 self.webhook_registry.clone()
219 }
220
221 #[cfg(feature = "workflows")]
224 async fn persist_workflow_definitions(&self, pool: &sqlx::PgPool) -> Result<()> {
225 for info in self.workflow_registry.definitions() {
226 let status = info.status.as_str();
227
228 let existing = sqlx::query!(
230 r#"
231 SELECT workflow_signature FROM forge_workflow_definitions
232 WHERE workflow_name = $1 AND workflow_version = $2
233 "#,
234 info.name,
235 info.version,
236 )
237 .fetch_optional(pool)
238 .await
239 .map_err(ForgeError::Database)?;
240
241 if let Some(row) = existing {
242 if row.workflow_signature != info.signature {
243 return Err(ForgeError::config(format!(
244 "Workflow '{}' version '{}' has a different signature than previously registered. \
245 Persisted contract changed under the same version. \
246 Expected signature: {}, got: {}. \
247 Create a new version instead of modifying the existing one.",
248 info.name, info.version, row.workflow_signature, info.signature
249 )));
250 }
251 sqlx::query!(
252 "UPDATE forge_workflow_definitions SET status = $3 WHERE workflow_name = $1 AND workflow_version = $2",
253 info.name,
254 info.version,
255 status,
256 )
257 .execute(pool)
258 .await
259 .map_err(ForgeError::Database)?;
260 } else {
261 sqlx::query!(
262 r#"
263 INSERT INTO forge_workflow_definitions (workflow_name, workflow_version, workflow_signature, status)
264 VALUES ($1, $2, $3, $4)
265 "#,
266 info.name,
267 info.version,
268 info.signature,
269 status,
270 )
271 .execute(pool)
272 .await
273 .map_err(ForgeError::Database)?;
274 }
275
276 tracing::debug!(
277 workflow = info.name,
278 version = info.version,
279 signature = info.signature,
280 status = status,
281 "Workflow definition registered"
282 );
283 }
284
285 Ok(())
286 }
287
288 pub async fn run(mut self) -> Result<()> {
290 let telemetry_config = forge_runtime::TelemetryConfig::from_observability_config(
291 &self.config.observability,
292 &self.config.project.name,
293 &self.config.project.version,
294 );
295 let telemetry_result = forge_runtime::init_telemetry(
296 &telemetry_config,
297 &self.config.project.name,
298 &self.config.observability.log_level,
299 );
300 match &telemetry_result {
301 Ok(true) | Ok(false) => {
302 tracing::debug!(
303 endpoint = %telemetry_config.otlp_endpoint,
304 traces = telemetry_config.enable_traces,
305 metrics = telemetry_config.enable_metrics,
306 logs = telemetry_config.enable_logs,
307 sampling = telemetry_config.sampling_ratio,
308 "Telemetry initialized"
309 );
310 }
311 Err(e) => eprintln!("forge: failed to initialize telemetry: {e}"),
314 }
315
316 tracing::debug!("Connecting to database");
317
318 let db =
319 Database::from_config_with_service(&self.config.database, &self.config.project.name)
320 .await?;
321 let pool = db.primary().clone();
322 let _ = db.start_health_monitor(self.shutdown_tx.subscribe());
324 self.db = Some(db);
325
326 tracing::debug!("Database connected");
327
328 let runner = MigrationRunner::new(pool.clone());
329
330 let mut user_migrations = load_migrations_from_dir(&self.migrations_dir)?;
331 user_migrations.extend(self.extra_migrations.clone());
332
333 runner.run(user_migrations).await?;
334 tracing::debug!("Migrations applied");
335
336 #[cfg(feature = "workflows")]
337 if !self.workflow_registry.is_empty() {
338 self.persist_workflow_definitions(&pool).await?;
339 }
340
341 let hostname = get_hostname();
342
343 let ip_address: IpAddr = std::env::var("HOST")
345 .unwrap_or_else(|_| "0.0.0.0".to_string())
346 .parse()
347 .unwrap_or_else(|_| "0.0.0.0".parse().expect("valid IP literal"));
348
349 if let Ok(port_str) = std::env::var("PORT")
350 && let Ok(port) = port_str.parse::<u16>()
351 {
352 self.config.gateway.port = port;
353 }
354
355 let roles: Vec<NodeRole> = self
356 .config
357 .node
358 .roles
359 .iter()
360 .map(config_role_to_node_role)
361 .collect();
362
363 let node_info = NodeInfo::new_local(
364 hostname,
365 ip_address,
366 self.config.gateway.port,
367 self.config.gateway.grpc_port,
368 roles.clone(),
369 self.config.node.worker_capabilities.clone(),
370 env!("CARGO_PKG_VERSION").to_string(),
371 );
372
373 let node_id = node_info.id;
374 self.node_id = node_id;
375
376 let node_registry = Arc::new(NodeRegistry::new(pool.clone(), node_info));
377
378 if let Err(e) = node_registry.register().await {
379 tracing::debug!("Failed to register node (tables may not exist): {}", e);
380 }
381
382 if let Err(e) = node_registry.set_status(NodeStatus::Active).await {
383 tracing::debug!("Failed to set node status: {}", e);
384 }
385
386 let notify_bus = Arc::new(PgNotifyBus::new(
390 pool.clone(),
391 &[
392 "forge_changes",
393 "forge_jobs_available",
394 "forge_workflow_wakeup",
395 forge_runtime::pg::LEADER_RELEASED_CHANNEL,
396 ],
397 ));
398
399 let leader_election = if roles.contains(&NodeRole::Scheduler) {
400 let election = Arc::new(
401 LeaderElection::new(
402 pool.clone(),
403 node_id,
404 LeaderRole::Scheduler,
405 LeaderConfig::default(),
406 )
407 .with_notify_bus(notify_bus.clone()),
408 );
409
410 if let Err(e) = election.try_become_leader().await {
412 tracing::debug!("Failed to acquire leadership: {}", e);
413 }
414
415 Some(election)
416 } else {
417 None
418 };
419
420 let shutdown = Arc::new(GracefulShutdown::new(
421 node_registry.clone(),
422 leader_election.clone(),
423 ShutdownConfig::default(),
424 ));
425
426 #[cfg(any(feature = "cron", feature = "daemons", feature = "workflows"))]
427 let http_client = CircuitBreakerClient::with_ssrf_protection();
428
429 let mut handles = Vec::new();
430 let mut leader_handles: Vec<tokio::task::JoinHandle<()>> = Vec::new();
433
434 {
435 let heartbeat_pool = pool.clone();
436 let heartbeat_node_id = node_id;
437 let config = HeartbeatConfig::from_cluster_config(&self.config.cluster);
438 handles.push(tokio::spawn(async move {
439 match HeartbeatLoop::new(heartbeat_pool, heartbeat_node_id, config).await {
440 Ok(heartbeat) => heartbeat.run().await,
441 Err(e) => tracing::error!(error = %e, "Failed to start heartbeat loop"),
442 }
443 }));
444 }
445
446 if let Some(ref election) = leader_election {
447 let election = election.clone();
448 handles.push(tokio::spawn(async move {
449 election.run().await;
450 }));
451 }
452
453 #[cfg(feature = "cron")]
454 {
455 forge_runtime::cron::register_cron_bridges(&self.cron_registry, &mut self.job_registry);
456 }
457
458 #[cfg(feature = "jobs")]
459 let job_queue = JobQueue::new(pool.clone());
460
461 #[cfg(feature = "gateway")]
465 let notify_bus_needs_direct_spawn = !roles.contains(&NodeRole::Gateway);
466 #[cfg(not(feature = "gateway"))]
467 let notify_bus_needs_direct_spawn = true;
468 if notify_bus_needs_direct_spawn {
469 let (bus_shutdown_tx, bus_shutdown_rx) = tokio::sync::watch::channel(false);
470 let bus_for_task = notify_bus.clone();
471 handles.push(tokio::spawn(async move {
472 bus_for_task.run(bus_shutdown_rx).await;
473 }));
474 let mut bus_broadcast_rx = self.shutdown_tx.subscribe();
475 tokio::spawn(async move {
476 let _ = bus_broadcast_rx.recv().await;
477 let _ = bus_shutdown_tx.send(true);
478 });
479 }
480
481 let kv_handle: Arc<dyn forge_core::function::KvHandle> =
482 Arc::new(forge_runtime::KvStore::new(pool.clone(), "handlers"));
483
484 #[cfg(feature = "workflows")]
488 let workflow_bridge_executor = Arc::new(
489 WorkflowExecutor::new(
490 Arc::new(self.workflow_registry.clone()),
491 pool.clone(),
492 job_queue.clone(),
493 http_client.clone(),
494 )
495 .with_kv(Arc::clone(&kv_handle)),
496 );
497 #[cfg(feature = "workflows")]
498 {
499 forge_runtime::workflow::register_workflow_bridge(
500 workflow_bridge_executor.clone(),
501 &mut self.job_registry,
502 );
503 }
504
505 #[cfg(feature = "jobs")]
509 let job_dispatcher = {
510 let job_queue_for_dispatch = JobQueue::new(pool.clone());
511 Arc::new(JobDispatcher::new(
512 job_queue_for_dispatch,
513 self.job_registry.clone(),
514 ))
515 };
516
517 #[cfg(feature = "jobs")]
520 if roles.contains(&NodeRole::Worker) {
521 let mut node_capabilities: Vec<String> = self.config.node.worker_capabilities.clone();
522 for queue_name in self.config.worker.queues.keys() {
523 if !node_capabilities.iter().any(|c| c == queue_name) {
524 node_capabilities.push(queue_name.clone());
525 }
526 }
527
528 for (queue_name, queue_cfg) in &self.config.worker.queues {
529 if queue_cfg.workers == 0 {
530 continue;
531 }
532 let worker_id = Uuid::new_v4();
533 let claim_untagged = queue_name == forge_core::config::DEFAULT_QUEUE;
534 let worker_config = WorkerConfig {
535 id: Some(worker_id),
536 capabilities: vec![queue_name.clone()],
537 claim_untagged,
538 max_concurrent: queue_cfg.workers,
539 poll_interval: *self.config.worker.poll_interval,
540 ..Default::default()
541 };
542
543 let worker_base = Worker::new(
544 worker_config,
545 job_queue.clone(),
546 self.job_registry.clone(),
547 pool.clone(),
548 notify_bus.clone(),
549 )
550 .with_kv(Arc::clone(&kv_handle))
551 .with_job_dispatch(job_dispatcher.clone());
552
553 #[cfg(feature = "workflows")]
554 let mut worker =
555 worker_base.with_workflow_dispatch(workflow_bridge_executor.clone());
556 #[cfg(not(feature = "workflows"))]
557 let mut worker = worker_base;
558
559 let queue_label = queue_name.clone();
560 handles.push(tokio::spawn(async move {
561 if let Err(e) = worker.run().await {
562 tracing::error!(queue = %queue_label, "Worker error: {}", e);
563 }
564 }));
565
566 tracing::debug!(
567 queue = %queue_name,
568 workers = queue_cfg.workers,
569 "Job worker pool started",
570 );
571 }
572
573 let total_worker_concurrency: usize =
577 self.config.worker.queues.values().map(|q| q.workers).sum();
578 const PERSISTENT_CONN_OVERHEAD: usize = 6;
579 let min_recommended = total_worker_concurrency + PERSISTENT_CONN_OVERHEAD;
580 if (self.config.database.pool_size as usize) < min_recommended {
581 tracing::warn!(
582 pool_size = self.config.database.pool_size,
583 total_worker_concurrency,
584 min_recommended,
585 "database.pool_size ({}) is below the recommended minimum ({}) for the \
586 configured worker concurrency. \
587 Formula: sum(workers per queue) + 6 = {} + 6 = {}. \
588 Increase database.pool_size to avoid connection exhaustion under load.",
589 self.config.database.pool_size,
590 min_recommended,
591 total_worker_concurrency,
592 min_recommended,
593 );
594 }
595 }
596
597 #[cfg(feature = "jobs")]
598 if roles.contains(&NodeRole::Worker) {
599 let kv_pool = pool.clone();
600 let mut kv_shutdown = self.shutdown_tx.subscribe();
601 let kv_leader = leader_election.clone();
602 handles.push(tokio::spawn(async move {
603 let kv = forge_runtime::KvStore::new(kv_pool.clone(), "app");
604 let rate_limiter = forge_runtime::StrictRateLimiter::new(kv_pool);
605 loop {
606 tokio::select! {
607 _ = kv_shutdown.recv() => break,
608 _ = tokio::time::sleep(Duration::from_secs(300)) => {}
609 }
610 let is_leader = kv_leader.as_ref().map(|e| e.is_leader()).unwrap_or(true);
611 if !is_leader {
612 continue;
613 }
614 match kv.cleanup_expired().await {
615 Ok(n) if n > 0 => tracing::debug!(count = n, "KV TTL cleanup"),
616 Err(e) => tracing::warn!(error = %e, "KV TTL cleanup failed"),
617 _ => {}
618 }
619 let cutoff = chrono::Utc::now() - chrono::Duration::hours(24);
620 match rate_limiter.cleanup(cutoff).await {
621 Ok(n) if n > 0 => tracing::debug!(count = n, "Rate limit bucket cleanup"),
622 Err(e) => tracing::warn!(error = %e, "Rate limit cleanup failed"),
623 _ => {}
624 }
625 }
626 }));
627 }
628
629 #[cfg(feature = "cron")]
630 let cron_runner_handle: Option<Arc<CronRunner>> = if roles.contains(&NodeRole::Scheduler) {
631 let cron_registry = self.cron_registry.clone();
632 let cron_pool = pool.clone();
633 let cron_leader_election = leader_election.clone();
634
635 let cron_config = CronRunnerConfig {
636 poll_interval: *self.config.cron.poll_interval,
637 node_id: node_id.as_uuid(),
638 is_leader: cron_leader_election.is_none(),
639 leader_election: cron_leader_election,
640 run_stale_threshold: Duration::from_secs(15 * 60),
641 ..Default::default()
642 };
643
644 let cron_runner = Arc::new(CronRunner::new(
645 cron_registry,
646 cron_pool,
647 job_queue.clone(),
648 cron_config,
649 ));
650 let cron_runner_clone = cron_runner.clone();
651
652 leader_handles.push(tokio::spawn(async move {
653 if let Err(e) = cron_runner_clone.run().await {
654 tracing::error!("Cron runner error: {}", e);
655 }
656 }));
657
658 tracing::debug!("Cron scheduler started");
659 Some(cron_runner)
660 } else {
661 None
662 };
663
664 #[cfg(feature = "workflows")]
665 let workflow_shutdown_token = CancellationToken::new();
666 #[cfg(feature = "workflows")]
667 if roles.contains(&NodeRole::Scheduler) {
668 let event_store = Arc::new(EventStore::new(pool.clone()));
669 let scheduler = WorkflowScheduler::new(
670 pool.clone(),
671 job_queue.clone(),
672 event_store,
673 WorkflowSchedulerConfig {
674 poll_interval: *self.config.workflow.poll_interval,
675 leader_election: leader_election.clone(),
676 ..WorkflowSchedulerConfig::default()
677 },
678 notify_bus.clone(),
679 );
680
681 let shutdown_token = workflow_shutdown_token.clone();
682 leader_handles.push(tokio::spawn(async move {
683 scheduler.run(shutdown_token).await;
684 }));
685
686 tracing::debug!("Workflow scheduler started");
687 }
688
689 #[cfg(feature = "workflows")]
690 let workflow_executor = workflow_bridge_executor;
691
692 #[cfg(feature = "daemons")]
693 if roles.contains(&NodeRole::Scheduler) && !self.daemon_registry.is_empty() {
694 let daemon_registry = self.daemon_registry.clone();
695 let daemon_pool = pool.clone();
696 let daemon_http = http_client.clone();
697 let daemon_shutdown_rx = self.shutdown_tx.subscribe();
698
699 let daemon_runner = DaemonRunner::new(
700 daemon_registry,
701 daemon_pool,
702 daemon_http,
703 node_id.as_uuid(),
704 daemon_shutdown_rx,
705 )
706 .with_config(forge_runtime::daemon::DaemonRunnerConfig {
707 health_check_interval: *self.config.daemon.health_check_interval,
708 heartbeat_interval: *self.config.daemon.heartbeat_interval,
709 });
710 #[cfg(feature = "jobs")]
711 let daemon_runner = daemon_runner.with_job_dispatch(job_dispatcher.clone());
712 #[cfg(feature = "workflows")]
713 let daemon_runner = daemon_runner.with_workflow_dispatch(workflow_executor.clone());
714 let daemon_runner = daemon_runner.with_kv(Arc::clone(&kv_handle));
715
716 leader_handles.push(tokio::spawn(async move {
717 if let Err(e) = daemon_runner.run().await {
718 tracing::error!("Daemon runner error: {}", e);
719 }
720 }));
721
722 tracing::debug!("Daemon runner started");
723 }
724
725 #[cfg(feature = "gateway")]
726 let mut reactor_handle = None;
727
728 #[cfg(feature = "gateway")]
729 if roles.contains(&NodeRole::Gateway) {
730 let tls: Option<TlsListenConfig> =
734 TlsListenConfig::from_core(&self.config.gateway.tls)?;
735
736 let any_requires_auth = self
739 .function_registry
740 .queries()
741 .any(|(_, info)| !info.is_public || info.required_role.is_some())
742 || self
743 .function_registry
744 .mutations()
745 .any(|(_, info)| !info.is_public || info.required_role.is_some());
746
747 if any_requires_auth && !self.config.auth.is_configured() {
748 return Err(ForgeError::config(
749 "One or more handlers require authentication (private scope or require_role) \
750 but auth is not configured. Set auth.jwt_secret (≥32 bytes) for HMAC or \
751 auth.jwks_url for external identity providers.",
752 ));
753 }
754
755 if self.config.gateway.cors_enabled
758 && self.config.gateway.cors_origins.iter().any(|o| o == "*")
759 {
760 let forge_env = std::env::var("FORGE_ENV").ok();
761 let is_dev = forge_env
762 .as_deref()
763 .is_some_and(|v| v.eq_ignore_ascii_case("development"));
764 if !is_dev {
765 let production_indicators = [
766 ("FORGE_ENV", std::env::var("FORGE_ENV").ok()),
767 ("NODE_ENV", std::env::var("NODE_ENV").ok()),
768 (
769 "RAILWAY_ENVIRONMENT",
770 std::env::var("RAILWAY_ENVIRONMENT").ok(),
771 ),
772 ("K_SERVICE", std::env::var("K_SERVICE").ok()),
773 ("FLY_APP_NAME", std::env::var("FLY_APP_NAME").ok()),
774 (
775 "KUBERNETES_SERVICE_HOST",
776 std::env::var("KUBERNETES_SERVICE_HOST").ok(),
777 ),
778 ("AWS_EXECUTION_ENV", std::env::var("AWS_EXECUTION_ENV").ok()),
779 ];
780 let hint = production_indicators
781 .iter()
782 .find_map(|(name, val)| {
783 val.as_ref().map(|v| format!(" ({name}={v} detected)"))
784 })
785 .unwrap_or_default();
786 return Err(ForgeError::config(format!(
787 "gateway.cors_origins = [\"*\"] is only allowed when FORGE_ENV=development{hint}. \
788 Set explicit origins (e.g. cors_origins = [\"https://yourdomain.com\"])."
789 )));
790 }
791 }
792
793 let gateway_config = RuntimeGatewayConfig {
794 port: self.config.gateway.port,
795 max_connections: self.config.gateway.max_connections,
796 sse_max_sessions: self.config.realtime.sse_max_sessions,
797 request_timeout_secs: self.config.gateway.request_timeout.as_secs(),
798 cors_enabled: self.config.gateway.cors_enabled,
799 cors_origins: self.config.gateway.cors_origins.clone(),
800 auth: AuthConfig::from_forge_config(&self.config.auth)
801 .map_err(|e| ForgeError::config(e.to_string()))?,
802 mcp: self.config.mcp.clone(),
803 quiet_paths: self.config.gateway.quiet_paths.clone(),
804 max_body_size_bytes: self.config.gateway.max_body_size.as_bytes(),
805 max_json_body_bytes: self.config.gateway.max_json_body_size.as_bytes(),
806 max_file_size_bytes: self.config.gateway.max_file_size.as_bytes(),
807 token_ttl: forge_core::AuthTokenTtl::new(
808 self.config.auth.access_token_ttl_secs(),
809 self.config.auth.refresh_token_ttl_days(),
810 ),
811 project_name: self.config.project.name.clone(),
812 tls,
813 reactor_config: {
814 let rt = &self.config.realtime;
815 ReactorConfig {
816 listener: ListenerConfig {
817 buffer_size: rt.postgres_change_buffer_size,
818 ..ListenerConfig::default()
819 },
820 invalidation: InvalidationConfig {
821 debounce_ms: rt.debounce_quiet_window.as_millis(),
822 max_debounce_ms: rt.debounce_max_wait.as_millis(),
823 ..InvalidationConfig::default()
824 },
825 realtime: RuntimeRealtimeConfig {
826 max_subscriptions_per_session: rt.subscription_max_per_session,
827 },
828 max_concurrent_reexecutions: rt.max_concurrent_reexecutions,
829 resync_interval_secs: rt.resync_interval.as_secs(),
830 shard_count: rt.shard_count,
831 ..ReactorConfig::default()
832 }
833 },
834 max_multipart_fields: self.config.gateway.max_multipart_fields,
835 max_sessions_per_user: self.config.realtime.max_sessions_per_user,
836 max_sessions_per_ip: self.config.realtime.max_sessions_per_ip,
837 max_subscriptions_per_user: self.config.realtime.max_subscriptions_per_user,
838 security_headers: self.config.gateway.security_headers,
839 hsts: self.config.gateway.hsts,
840 trusted_proxies: self
841 .config
842 .gateway
843 .trusted_proxies
844 .iter()
845 .filter_map(|s| {
846 s.parse::<ipnet::IpNet>()
847 .or_else(|_| s.parse::<std::net::IpAddr>().map(ipnet::IpNet::from))
848 .ok()
849 })
850 .collect(),
851 max_jobs_per_request: self.config.gateway.max_jobs_per_request,
852 max_result_size_bytes: self.config.gateway.max_result_size_bytes,
853 max_json_depth: self.config.gateway.max_json_depth,
854 };
855
856 let db_ref = self
857 .db
858 .clone()
859 .ok_or_else(|| ForgeError::internal("Database not initialized"))?;
860
861 let gateway = GatewayServer::new(
862 gateway_config,
863 self.function_registry.clone(),
864 db_ref.clone(),
865 notify_bus.clone(),
866 )
867 .with_node_id(self.node_id);
868 #[cfg(feature = "jobs")]
869 let gateway = gateway.with_job_dispatcher(job_dispatcher.clone());
870 #[cfg(feature = "workflows")]
871 let gateway = gateway.with_workflow_dispatcher(workflow_executor.clone());
872 let gateway = gateway.with_kv(Arc::clone(&kv_handle));
873 let mut gateway = gateway.with_mcp_registry(self.mcp_registry.clone());
874
875 if matches!(
876 self.config.rate_limit.mode,
877 forge_core::config::RateLimitMode::Hybrid
878 ) {
879 #[allow(clippy::disallowed_methods)]
881 let active_nodes: Option<i64> =
882 sqlx::query_scalar("SELECT COUNT(*) FROM forge_nodes WHERE status = 'active'")
883 .fetch_one(db_ref.primary())
884 .await
885 .ok();
886 if active_nodes.is_some_and(|n| n > 1) {
887 let n = active_nodes.unwrap_or(0);
888 tracing::warn!(
889 active_nodes = n,
890 "rate_limit.mode is 'hybrid' but {n} active nodes detected. \
891 Per-user/per-IP limits are local-only and effectively multiply by the \
892 node count. Set rate_limit.mode = \"strict\" for cluster deployments."
893 );
894 }
895 }
896
897 let rate_limiter: std::sync::Arc<dyn forge_core::rate_limit::RateLimiterBackend> =
898 match self.config.rate_limit.mode {
899 forge_core::config::RateLimitMode::Strict => std::sync::Arc::new(
900 forge_runtime::StrictRateLimiter::new(db_ref.primary().clone()),
901 ),
902 forge_core::config::RateLimitMode::Hybrid => {
903 std::sync::Arc::new(forge_runtime::HybridRateLimiter::with_max_buckets(
904 db_ref.primary().clone(),
905 self.config.rate_limit.max_local_buckets,
906 ))
907 }
908 };
909 gateway = gateway.with_rate_limiter(rate_limiter);
910 if let Some(resolver) = self.role_resolver.take() {
911 gateway = gateway.with_role_resolver(resolver);
912 }
913 if self.config.signals.enabled {
914 let signals_pool = std::sync::Arc::new(db_ref.primary().clone());
915 let collector = forge_runtime::signals::SignalsCollector::spawn(
916 signals_pool.clone(),
917 self.config.signals.batch_size,
918 *self.config.signals.flush_interval,
919 self.config.signals.channel_capacity,
920 );
921 let geoip = match &self.config.signals.geoip_db_path {
924 Some(path) => {
925 let resolver = forge_runtime::signals::geoip::GeoIpResolver::from_mmdb(
926 std::path::Path::new(path),
927 )?;
928 tracing::info!(path, "GeoIP: MaxMind MMDB loaded (city-level)");
929 resolver
930 }
931 None => forge_runtime::signals::geoip::GeoIpResolver::new(),
932 };
933 gateway = gateway
934 .with_signals_collector(collector)
935 .with_signals_anonymize_ip(self.config.signals.anonymize_ip)
936 .with_signals_geoip(geoip);
937
938 forge_runtime::signals::session::spawn_session_reaper(
939 signals_pool.clone(),
940 (self.config.signals.session_timeout.as_secs() / 60) as u32,
941 );
942
943 forge_runtime::signals::partition::ensure_partitions(&signals_pool).await;
944
945 {
946 let partition_pool = signals_pool.clone();
947 let retention_days = self.config.signals.retention_days;
948 let partition_leader = leader_election.clone();
949 let mut partition_shutdown = self.shutdown_tx.subscribe();
950 handles.push(tokio::spawn(async move {
951 loop {
952 tokio::select! {
953 _ = partition_shutdown.recv() => break,
954 _ = tokio::time::sleep(Duration::from_secs(21_600)) => {}
955 }
956 let is_leader = partition_leader
957 .as_ref()
958 .map(|e| e.is_leader())
959 .unwrap_or(true);
960 if is_leader {
961 forge_runtime::signals::partition::ensure_partitions(
962 &partition_pool,
963 )
964 .await;
965 forge_runtime::signals::partition::drop_old_partitions(
966 &partition_pool,
967 retention_days,
968 )
969 .await;
970 forge_runtime::signals::partition::check_default_partition(
971 &partition_pool,
972 )
973 .await;
974 }
975 }
976 }));
977 }
978
979 tracing::info!("Signals enabled (analytics + diagnostics)");
980 }
981
982 if let Some(factory) = self.custom_routes_factory.take() {
983 gateway = gateway.with_custom_routes(factory(pool.clone()));
984 tracing::debug!("Custom routes merged into gateway middleware stack");
985 }
986
987 let reactor = gateway.reactor();
988 if let Err(e) = reactor.start().await {
989 tracing::error!("Failed to start reactor: {}", e);
990 } else {
991 tracing::debug!("Reactor started");
992 reactor_handle = Some(reactor);
993 }
994
995 let api_router = gateway.router();
996 let mut router = Router::new().nest("/_api", api_router);
997
998 if !self.webhook_registry.is_empty() {
999 use axum::routing::post;
1000 use tower_http::cors::{Any, CorsLayer};
1001
1002 let webhook_state = WebhookState::new(self.webhook_registry.clone(), pool.clone());
1003 #[cfg(feature = "jobs")]
1004 let webhook_state = webhook_state.with_job_dispatcher(job_dispatcher.clone());
1005 #[cfg(feature = "workflows")]
1006 let webhook_state =
1007 webhook_state.with_workflow_dispatcher(workflow_executor.clone());
1008 let webhook_state = webhook_state.with_kv(Arc::clone(&kv_handle));
1009 let webhook_state = Arc::new(webhook_state);
1010
1011 let webhook_cors = if self.config.gateway.cors_enabled
1013 || !self.config.gateway.cors_origins.is_empty()
1014 {
1015 if self.config.gateway.cors_origins.iter().any(|o| o == "*") {
1016 CorsLayer::new()
1017 .allow_origin(Any)
1018 .allow_methods(Any)
1019 .allow_headers(Any)
1020 } else {
1021 use axum::http::Method;
1022 let origins: Vec<_> = self
1023 .config
1024 .gateway
1025 .cors_origins
1026 .iter()
1027 .filter_map(|o| o.parse().ok())
1028 .collect();
1029 CorsLayer::new()
1030 .allow_origin(origins)
1031 .allow_methods([
1032 Method::GET,
1033 Method::POST,
1034 Method::PUT,
1035 Method::DELETE,
1036 Method::PATCH,
1037 Method::OPTIONS,
1038 ])
1039 .allow_headers([
1040 axum::http::header::CONTENT_TYPE,
1041 axum::http::header::AUTHORIZATION,
1042 axum::http::header::ACCEPT,
1043 axum::http::HeaderName::from_static("x-webhook-signature"),
1044 axum::http::HeaderName::from_static("x-idempotency-key"),
1045 ])
1046 .allow_credentials(true)
1047 }
1048 } else {
1049 CorsLayer::new()
1050 };
1051
1052 let webhook_router = Router::new()
1053 .route("/{*path}", post(webhook_handler).with_state(webhook_state))
1054 .layer(axum::extract::DefaultBodyLimit::max(1024 * 1024))
1055 .layer(
1056 tower::ServiceBuilder::new()
1057 .layer(axum::error_handling::HandleErrorLayer::new(
1058 |err: tower::BoxError| async move {
1059 if err.is::<tower::timeout::error::Elapsed>() {
1060 return (
1061 axum::http::StatusCode::REQUEST_TIMEOUT,
1062 "Request timed out",
1063 );
1064 }
1065 (
1066 axum::http::StatusCode::SERVICE_UNAVAILABLE,
1067 "Server overloaded",
1068 )
1069 },
1070 ))
1071 .layer(tower::limit::ConcurrencyLimitLayer::new(
1072 self.config.gateway.max_connections,
1073 ))
1074 .layer(tower::timeout::TimeoutLayer::new(Duration::from_secs(
1075 self.config.gateway.request_timeout.as_secs(),
1076 ))),
1077 )
1078 .layer(webhook_cors);
1079
1080 router = router.nest("/_api/webhooks", webhook_router);
1081
1082 tracing::debug!(
1083 webhooks = ?self.webhook_registry.paths().collect::<Vec<_>>(),
1084 "Webhook routes registered"
1085 );
1086 }
1087
1088 if self.config.mcp.enabled {
1089 use axum::routing::get;
1090
1091 async fn oauth_not_supported() -> impl axum::response::IntoResponse {
1094 (
1095 axum::http::StatusCode::NOT_FOUND,
1096 axum::Json(serde_json::json!({
1097 "error": "oauth_not_supported",
1098 "error_description": "This server does not support OAuth. Connect without authentication."
1099 })),
1100 )
1101 }
1102
1103 #[cfg(feature = "mcp-oauth")]
1104 if let Some((oauth_api_router, oauth_state)) = gateway.oauth_router() {
1105 router = router.nest("/_api", oauth_api_router);
1106 router = router
1107 .route(
1108 "/.well-known/oauth-authorization-server",
1109 get(forge_runtime::gateway::oauth::well_known_oauth_metadata)
1110 .with_state(oauth_state.clone()),
1111 )
1112 .route(
1113 "/.well-known/oauth-protected-resource",
1114 get(forge_runtime::gateway::oauth::well_known_resource_metadata)
1115 .with_state(oauth_state),
1116 );
1117
1118 tracing::info!("OAuth 2.1 endpoints enabled for MCP");
1119 } else {
1120 router = router
1121 .route(
1122 "/.well-known/oauth-authorization-server",
1123 get(oauth_not_supported),
1124 )
1125 .route(
1126 "/.well-known/oauth-protected-resource",
1127 get(oauth_not_supported),
1128 );
1129 }
1130
1131 #[cfg(not(feature = "mcp-oauth"))]
1132 {
1133 router = router
1134 .route(
1135 "/.well-known/oauth-authorization-server",
1136 get(oauth_not_supported),
1137 )
1138 .route(
1139 "/.well-known/oauth-protected-resource",
1140 get(oauth_not_supported),
1141 );
1142 }
1143 }
1144
1145 if let Some(handler) = self.frontend_handler {
1146 use axum::routing::get;
1147 router = router.fallback(get(handler));
1148 tracing::debug!("Frontend handler enabled");
1149 }
1150
1151 let addr = gateway.addr();
1152 let tls = gateway.tls().cloned();
1153 let mut gateway_shutdown_rx = shutdown.subscribe();
1157
1158 handles.push(tokio::spawn(async move {
1159 tracing::debug!(addr = %addr, "Gateway server binding");
1160 let listener = match bind_listener(addr, tls.as_ref()).await {
1161 Ok(l) => l,
1162 Err(e) => {
1163 tracing::error!(error = %e, "Failed to bind gateway listener");
1164 return;
1165 }
1166 };
1167 let serve = axum::serve(listener, router).with_graceful_shutdown(async move {
1168 let _ = gateway_shutdown_rx.wait_for(|v| *v).await;
1169 tracing::debug!("Gateway draining in-flight requests");
1170 });
1171 if let Err(e) = serve.await {
1172 tracing::error!("Gateway server error: {}", e);
1173 }
1174 }));
1175 }
1176
1177 #[cfg(feature = "jobs")]
1178 let jobs_count = self.job_registry.len();
1179 #[cfg(not(feature = "jobs"))]
1180 let jobs_count: usize = 0;
1181 #[cfg(feature = "cron")]
1182 let crons_count = self.cron_registry.len();
1183 #[cfg(not(feature = "cron"))]
1184 let crons_count: usize = 0;
1185 #[cfg(feature = "workflows")]
1186 let workflows_count = self.workflow_registry.len();
1187 #[cfg(not(feature = "workflows"))]
1188 let workflows_count: usize = 0;
1189 #[cfg(feature = "daemons")]
1190 let daemons_count = self.daemon_registry.len();
1191 #[cfg(not(feature = "daemons"))]
1192 let daemons_count: usize = 0;
1193 #[cfg(feature = "gateway")]
1194 let webhooks_count = self.webhook_registry.len();
1195 #[cfg(not(feature = "gateway"))]
1196 let webhooks_count: usize = 0;
1197 #[cfg(feature = "gateway")]
1198 let mcp_tools_count = self.mcp_registry.len();
1199 #[cfg(not(feature = "gateway"))]
1200 let mcp_tools_count: usize = 0;
1201
1202 tracing::info!(
1203 queries = self.function_registry.queries().count(),
1204 mutations = self.function_registry.mutations().count(),
1205 jobs = jobs_count,
1206 crons = crons_count,
1207 workflows = workflows_count,
1208 daemons = daemons_count,
1209 webhooks = webhooks_count,
1210 mcp_tools = mcp_tools_count,
1211 "Functions registered"
1212 );
1213
1214 {
1215 let pool = pool.clone();
1216 tokio::spawn(async move {
1217 loop {
1218 tokio::time::sleep(Duration::from_secs(15)).await;
1219 forge_runtime::observability::record_pool_metrics(&pool);
1220 }
1221 });
1222 }
1223
1224 let role_names: Vec<&str> = roles.iter().map(|r| r.as_str()).collect();
1225 let capabilities = &self.config.node.worker_capabilities;
1226 tracing::info!(
1227 node_id = %node_id,
1228 project = %self.config.project.name,
1229 version = env!("CARGO_PKG_VERSION"),
1230 roles = ?role_names,
1231 worker_capabilities = ?capabilities,
1232 port = self.config.gateway.port,
1233 db_pool_size = self.config.database.pool_size,
1234 cluster_discovery = ?self.config.cluster.discovery,
1235 observability = self.config.observability.enabled,
1236 mcp = self.config.mcp.enabled,
1237 "Forge started"
1238 );
1239
1240 let mut shutdown_rx = self.shutdown_tx.subscribe();
1241
1242 tokio::select! {
1243 _ = tokio::signal::ctrl_c() => {
1244 tracing::debug!("Received ctrl-c");
1245 }
1246 _ = shutdown_rx.recv() => {
1247 tracing::debug!("Received shutdown notification");
1248 }
1249 }
1250
1251 tracing::debug!("Graceful shutdown starting");
1252
1253 let _ = self.shutdown_tx.send(());
1255
1256 #[cfg(feature = "workflows")]
1258 workflow_shutdown_token.cancel();
1259
1260 #[cfg(feature = "cron")]
1261 if let Some(ref runner) = cron_runner_handle {
1262 runner.stop().await;
1263 }
1264
1265 tracing::debug!("Waiting for leader-held tasks to drain");
1268 for handle in leader_handles {
1269 let _ = handle.await;
1270 }
1271 tracing::debug!("Leader-held tasks drained");
1272
1273 if let Err(e) = shutdown.shutdown().await {
1275 tracing::warn!(error = %e, "Shutdown error");
1276 }
1277
1278 if let Some(ref election) = leader_election {
1279 election.stop();
1280 }
1281
1282 #[cfg(feature = "gateway")]
1283 if let Some(ref reactor) = reactor_handle {
1284 reactor.stop();
1285 }
1286
1287 if let Some(ref db) = self.db {
1288 db.close().await;
1289 }
1290
1291 forge_runtime::shutdown_telemetry();
1292 tracing::info!("Forge stopped");
1293 Ok(())
1294 }
1295
1296 pub fn shutdown(&self) {
1298 let _ = self.shutdown_tx.send(());
1299 }
1300}
1301
1302#[cfg(test)]
1303#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
1304mod tests {
1305 use super::*;
1306 use std::future::Future;
1307 use std::pin::Pin;
1308
1309 use forge_core::config::NodeRole as ConfigNodeRole;
1310 use forge_core::mcp::{McpToolAnnotations, McpToolInfo};
1311
1312 struct TestMcpTool;
1313
1314 impl forge_core::__sealed::Sealed for TestMcpTool {}
1315
1316 impl ForgeMcpTool for TestMcpTool {
1317 type Args = serde_json::Value;
1318 type Output = serde_json::Value;
1319
1320 fn info() -> McpToolInfo {
1321 McpToolInfo {
1322 name: "test.mcp.tool",
1323 title: None,
1324 description: None,
1325 required_role: None,
1326 is_public: false,
1327 timeout: None,
1328 rate_limit_requests: None,
1329 rate_limit_per_secs: None,
1330 rate_limit_key: None,
1331 annotations: McpToolAnnotations::default(),
1332 icons: &[],
1333 }
1334 }
1335
1336 fn execute(
1337 _ctx: &forge_core::McpToolContext,
1338 _args: Self::Args,
1339 ) -> Pin<Box<dyn Future<Output = forge_core::Result<Self::Output>> + Send + '_>> {
1340 Box::pin(async { Ok(serde_json::json!({ "ok": true })) })
1341 }
1342 }
1343
1344 #[test]
1345 fn test_forge_builder_new() {
1346 let builder = ForgeBuilder::new();
1347 assert!(builder.config.is_none());
1348 }
1349
1350 #[test]
1351 fn test_forge_builder_requires_config() {
1352 let builder = ForgeBuilder::new();
1353 let result = builder.build();
1354 assert!(result.is_err());
1355 }
1356
1357 #[test]
1358 fn test_forge_builder_with_config() {
1359 let config = ForgeConfig::default_with_database_url("postgres://localhost/test");
1360 let result = ForgeBuilder::new().config(config).build();
1361 assert!(result.is_ok());
1362 }
1363
1364 #[test]
1365 fn test_forge_builder_register_mcp_tool() {
1366 let builder = ForgeBuilder::new().register_mcp_tool::<TestMcpTool>();
1367 assert_eq!(builder.mcp_registry.len(), 1);
1368 }
1369
1370 #[test]
1371 fn test_config_role_conversion() {
1372 use builder::config_role_to_node_role;
1373 assert_eq!(
1374 config_role_to_node_role(&ConfigNodeRole::Gateway),
1375 NodeRole::Gateway
1376 );
1377 assert_eq!(
1378 config_role_to_node_role(&ConfigNodeRole::Worker),
1379 NodeRole::Worker
1380 );
1381 assert_eq!(
1382 config_role_to_node_role(&ConfigNodeRole::Scheduler),
1383 NodeRole::Scheduler
1384 );
1385 assert_eq!(
1386 config_role_to_node_role(&ConfigNodeRole::Function),
1387 NodeRole::Function
1388 );
1389 }
1390}