1use std::future::Future;
13use std::net::IpAddr;
14use std::path::PathBuf;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::time::Duration;
18
19use axum::body::Body;
20use axum::http::Request;
21use axum::response::Response;
22use tokio::sync::broadcast;
23
24use forge_core::cluster::{LeaderRole, NodeId, NodeInfo, NodeRole, NodeStatus};
25use forge_core::config::{ForgeConfig, NodeRole as ConfigNodeRole};
26use forge_core::error::{ForgeError, Result};
27use forge_runtime::migrations::{Migration, MigrationRunner, load_migrations_from_dir};
28
29use forge_runtime::cluster::{
30 GracefulShutdown, HeartbeatConfig, HeartbeatLoop, LeaderConfig, LeaderElection, NodeRegistry,
31 ShutdownConfig,
32};
33use forge_runtime::cron::{CronRegistry, CronRunner, CronRunnerConfig};
34use forge_runtime::dashboard::{
35 DashboardConfig, DashboardState, create_api_router, create_dashboard_router,
36};
37use forge_runtime::db::Database;
38use forge_runtime::function::FunctionRegistry;
39use forge_runtime::gateway::{AuthConfig, GatewayConfig as RuntimeGatewayConfig, GatewayServer};
40use forge_runtime::jobs::{JobDispatcher, JobQueue, JobRegistry, Worker, WorkerConfig};
41use forge_runtime::observability::{ObservabilityConfig, ObservabilityState};
42use forge_runtime::realtime::{WebSocketConfig, WebSocketServer};
43use forge_runtime::workflow::{
44 EventStore, WorkflowExecutor, WorkflowRegistry, WorkflowScheduler, WorkflowSchedulerConfig,
45};
46use tokio_util::sync::CancellationToken;
47
48pub type FrontendHandler = fn(Request<Body>) -> Pin<Box<dyn Future<Output = Response> + Send>>;
50
51pub mod prelude {
53 pub use chrono::{DateTime, Utc};
55 pub use uuid::Uuid;
56
57 pub use serde::{Deserialize, Serialize};
59 pub use serde_json;
60
61 pub type Timestamp = DateTime<Utc>;
63
64 pub use forge_core::cluster::NodeRole;
66 pub use forge_core::config::ForgeConfig;
67 pub use forge_core::cron::{CronContext, ForgeCron};
68 pub use forge_core::env::EnvAccess;
69 pub use forge_core::error::{ForgeError, Result};
70 pub use forge_core::function::{
71 ActionContext, AuthContext, ForgeMutation, ForgeQuery, MutationContext, QueryContext,
72 };
73 pub use forge_core::job::{ForgeJob, JobContext, JobPriority};
74 pub use forge_core::realtime::Delta;
75 pub use forge_core::schema::{FieldDef, ModelMeta, SchemaRegistry, TableDef};
76 pub use forge_core::workflow::{ForgeWorkflow, WorkflowContext};
77
78 pub use crate::{Forge, ForgeBuilder};
79}
80
81pub struct Forge {
83 config: ForgeConfig,
84 db: Option<Database>,
85 node_id: NodeId,
86 function_registry: FunctionRegistry,
87 job_registry: JobRegistry,
88 cron_registry: Arc<CronRegistry>,
89 workflow_registry: WorkflowRegistry,
90 shutdown_tx: broadcast::Sender<()>,
91 migrations_dir: PathBuf,
93 extra_migrations: Vec<Migration>,
95 observability: Option<ObservabilityState>,
97 frontend_handler: Option<FrontendHandler>,
99}
100
101impl Forge {
102 pub fn builder() -> ForgeBuilder {
104 ForgeBuilder::new()
105 }
106
107 pub fn node_id(&self) -> NodeId {
109 self.node_id
110 }
111
112 pub fn config(&self) -> &ForgeConfig {
114 &self.config
115 }
116
117 pub fn function_registry(&self) -> &FunctionRegistry {
119 &self.function_registry
120 }
121
122 pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
124 &mut self.function_registry
125 }
126
127 pub fn job_registry(&self) -> &JobRegistry {
129 &self.job_registry
130 }
131
132 pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
134 &mut self.job_registry
135 }
136
137 pub fn cron_registry(&self) -> Arc<CronRegistry> {
139 self.cron_registry.clone()
140 }
141
142 pub fn workflow_registry(&self) -> &WorkflowRegistry {
144 &self.workflow_registry
145 }
146
147 pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
149 &mut self.workflow_registry
150 }
151
152 pub fn observability(&self) -> Option<&ObservabilityState> {
154 self.observability.as_ref()
155 }
156
157 pub async fn run(mut self) -> Result<()> {
159 tracing::info!("FORGE runtime starting");
160
161 let db = Database::from_config(&self.config.database).await?;
163 let pool = db.primary().clone();
164 self.db = Some(db);
165
166 tracing::info!("Connected to database");
167
168 let runner = MigrationRunner::new(pool.clone());
171
172 let mut user_migrations = load_migrations_from_dir(&self.migrations_dir)?;
174 user_migrations.extend(self.extra_migrations.clone());
175
176 runner.run(user_migrations).await?;
177 tracing::info!("Migrations completed");
178
179 let obs_config = ObservabilityConfig::default();
181 let observability = ObservabilityState::new(obs_config, pool.clone());
182 self.observability = Some(observability.clone());
183
184 let obs_handles = observability.start_background_tasks();
186 tracing::info!(
187 "Observability collectors started ({} background tasks)",
188 obs_handles.len()
189 );
190
191 let hostname = hostname::get()
193 .map(|h| h.to_string_lossy().to_string())
194 .unwrap_or_else(|_| "unknown".to_string());
195
196 let ip_address: IpAddr = "127.0.0.1".parse().unwrap();
197 let roles: Vec<NodeRole> = self
198 .config
199 .node
200 .roles
201 .iter()
202 .map(config_role_to_node_role)
203 .collect();
204
205 let node_info = NodeInfo::new_local(
206 hostname,
207 ip_address,
208 self.config.gateway.port,
209 self.config.gateway.grpc_port,
210 roles.clone(),
211 self.config.node.worker_capabilities.clone(),
212 env!("CARGO_PKG_VERSION").to_string(),
213 );
214
215 let node_id = node_info.id;
216 self.node_id = node_id;
217
218 let node_registry = Arc::new(NodeRegistry::new(pool.clone(), node_info));
220
221 if let Err(e) = node_registry.register().await {
223 tracing::warn!("Failed to register node (tables may not exist): {}", e);
224 }
225
226 if let Err(e) = node_registry.set_status(NodeStatus::Active).await {
228 tracing::warn!("Failed to set node status: {}", e);
229 }
230
231 let leader_election = if roles.contains(&NodeRole::Scheduler) {
233 let election = Arc::new(LeaderElection::new(
234 pool.clone(),
235 node_id,
236 LeaderRole::Scheduler,
237 LeaderConfig::default(),
238 ));
239
240 if let Err(e) = election.try_become_leader().await {
242 tracing::warn!("Failed to acquire leadership: {}", e);
243 }
244
245 Some(election)
246 } else {
247 None
248 };
249
250 let shutdown = Arc::new(GracefulShutdown::new(
252 node_registry.clone(),
253 leader_election.clone(),
254 ShutdownConfig::default(),
255 ));
256
257 let http_client = reqwest::Client::new();
259
260 let mut handles = Vec::new();
262
263 {
265 let heartbeat_pool = pool.clone();
266 let heartbeat_node_id = node_id;
267 let config = HeartbeatConfig::default();
268 handles.push(tokio::spawn(async move {
269 let heartbeat = HeartbeatLoop::new(heartbeat_pool, heartbeat_node_id, config);
270 heartbeat.run().await;
271 }));
272 }
273
274 if let Some(ref election) = leader_election {
276 let election = election.clone();
277 handles.push(tokio::spawn(async move {
278 election.run().await;
279 }));
280 }
281
282 if roles.contains(&NodeRole::Worker) {
284 let job_queue = JobQueue::new(pool.clone());
285 let worker_config = WorkerConfig {
286 id: Some(node_id.as_uuid()),
287 capabilities: self.config.node.worker_capabilities.clone(),
288 max_concurrent: self.config.worker.max_concurrent_jobs,
289 poll_interval: Duration::from_millis(self.config.worker.poll_interval_ms),
290 ..Default::default()
291 };
292
293 let mut worker = Worker::with_observability(
294 worker_config,
295 job_queue,
296 self.job_registry.clone(),
297 pool.clone(),
298 observability.clone(),
299 );
300
301 handles.push(tokio::spawn(async move {
302 if let Err(e) = worker.run().await {
303 tracing::error!("Worker error: {}", e);
304 }
305 }));
306
307 tracing::info!("Job worker started");
308 }
309
310 if roles.contains(&NodeRole::Scheduler) {
312 let cron_registry = self.cron_registry.clone();
313 let cron_pool = pool.clone();
314 let cron_http = http_client.clone();
315 let is_leader = leader_election
316 .as_ref()
317 .map(|e| e.is_leader())
318 .unwrap_or(false);
319
320 let cron_config = CronRunnerConfig {
321 poll_interval: Duration::from_secs(1),
322 node_id: node_id.as_uuid(),
323 is_leader,
324 };
325
326 let cron_runner = CronRunner::with_observability(
327 cron_registry,
328 cron_pool,
329 cron_http,
330 cron_config,
331 observability.clone(),
332 );
333
334 handles.push(tokio::spawn(async move {
335 if let Err(e) = cron_runner.run().await {
336 tracing::error!("Cron runner error: {}", e);
337 }
338 }));
339
340 tracing::info!("Cron scheduler started");
341 }
342
343 let workflow_shutdown_token = CancellationToken::new();
345 if roles.contains(&NodeRole::Scheduler) {
346 let scheduler_executor = Arc::new(WorkflowExecutor::new(
347 Arc::new(self.workflow_registry.clone()),
348 pool.clone(),
349 http_client.clone(),
350 ));
351 let event_store = Arc::new(EventStore::new(pool.clone()));
352 let scheduler = WorkflowScheduler::new(
353 pool.clone(),
354 scheduler_executor,
355 event_store,
356 WorkflowSchedulerConfig::default(),
357 );
358
359 let shutdown_token = workflow_shutdown_token.clone();
360 handles.push(tokio::spawn(async move {
361 scheduler.run(shutdown_token).await;
362 }));
363
364 tracing::info!("Workflow scheduler started");
365 }
366
367 let mut reactor_handle = None;
369
370 let job_queue = JobQueue::new(pool.clone());
372 let job_dispatcher = Arc::new(JobDispatcher::new(job_queue, self.job_registry.clone()));
373 let workflow_executor = Arc::new(WorkflowExecutor::new(
374 Arc::new(self.workflow_registry.clone()),
375 pool.clone(),
376 http_client.clone(),
377 ));
378
379 if roles.contains(&NodeRole::Gateway) {
381 let gateway_config = RuntimeGatewayConfig {
382 port: self.config.gateway.port,
383 max_connections: self.config.gateway.max_connections,
384 request_timeout_secs: self.config.gateway.request_timeout_secs,
385 cors_enabled: true,
386 cors_origins: vec!["*".to_string()],
387 auth: AuthConfig::default(),
388 };
389
390 let dashboard_state = DashboardState {
392 pool: pool.clone(),
393 config: DashboardConfig::default(),
394 job_registry: self.job_registry.clone(),
395 cron_registry: self.cron_registry.clone(),
396 workflow_registry: self.workflow_registry.clone(),
397 job_dispatcher: Some(job_dispatcher.clone()),
398 workflow_executor: Some(workflow_executor.clone()),
399 };
400
401 let gateway = GatewayServer::with_observability(
403 gateway_config,
404 self.function_registry.clone(),
405 pool.clone(),
406 observability.clone(),
407 )
408 .with_job_dispatcher(job_dispatcher.clone())
409 .with_workflow_dispatcher(workflow_executor.clone());
410
411 let reactor = gateway.reactor();
413 if let Err(e) = reactor.start().await {
414 tracing::error!("Failed to start reactor: {}", e);
415 } else {
416 tracing::info!("Reactor started for real-time updates");
417 reactor_handle = Some(reactor);
418 }
419
420 let mut router = gateway.router();
421
422 router = router
424 .nest(
425 "/_dashboard",
426 create_dashboard_router(dashboard_state.clone()),
427 )
428 .nest("/_api", create_api_router(dashboard_state));
429
430 if let Some(handler) = self.frontend_handler {
432 use axum::routing::get;
433 router = router.fallback(get(handler));
434 tracing::info!("Frontend handler enabled - serving embedded assets");
435 }
436
437 let addr = gateway.addr();
438
439 handles.push(tokio::spawn(async move {
440 tracing::info!("Gateway server listening on {}", addr);
441 let listener = tokio::net::TcpListener::bind(addr)
442 .await
443 .expect("Failed to bind");
444 if let Err(e) = axum::serve(listener, router).await {
445 tracing::error!("Gateway server error: {}", e);
446 }
447 }));
448
449 tracing::info!("HTTP gateway started on port {}", self.config.gateway.port);
450 }
451
452 if roles.contains(&NodeRole::Gateway) {
454 let ws_config = WebSocketConfig::default();
455 let _ws_server = WebSocketServer::new(node_id, ws_config);
456
457 tracing::info!("WebSocket server initialized");
460 }
461
462 tracing::info!("FORGE runtime started successfully");
463 tracing::info!(" Node ID: {}", node_id);
464 tracing::info!(" Roles: {:?}", roles);
465
466 let mut shutdown_rx = self.shutdown_tx.subscribe();
468
469 tokio::select! {
470 _ = tokio::signal::ctrl_c() => {
471 tracing::info!("Received shutdown signal");
472 }
473 _ = shutdown_rx.recv() => {
474 tracing::info!("Received shutdown notification");
475 }
476 }
477
478 tracing::info!("Starting graceful shutdown...");
480
481 workflow_shutdown_token.cancel();
483 tracing::info!("Workflow scheduler stopped");
484
485 if let Err(e) = shutdown.shutdown().await {
486 tracing::warn!("Shutdown error: {}", e);
487 }
488
489 if let Some(ref election) = leader_election {
491 election.stop();
492 }
493
494 if let Some(ref reactor) = reactor_handle {
496 reactor.stop();
497 tracing::info!("Reactor stopped");
498 }
499
500 observability.shutdown().await;
502 tracing::info!("Observability shutdown complete");
503
504 if let Some(ref db) = self.db {
506 db.close().await;
507 }
508
509 tracing::info!("FORGE runtime stopped");
510 Ok(())
511 }
512
513 pub fn shutdown(&self) {
515 let _ = self.shutdown_tx.send(());
516 }
517}
518
519pub struct ForgeBuilder {
521 config: Option<ForgeConfig>,
522 function_registry: FunctionRegistry,
523 job_registry: JobRegistry,
524 cron_registry: CronRegistry,
525 workflow_registry: WorkflowRegistry,
526 migrations_dir: PathBuf,
527 extra_migrations: Vec<Migration>,
528 frontend_handler: Option<FrontendHandler>,
529}
530
531impl ForgeBuilder {
532 pub fn new() -> Self {
534 Self {
535 config: None,
536 function_registry: FunctionRegistry::new(),
537 job_registry: JobRegistry::new(),
538 cron_registry: CronRegistry::new(),
539 workflow_registry: WorkflowRegistry::new(),
540 migrations_dir: PathBuf::from("migrations"),
541 extra_migrations: Vec::new(),
542 frontend_handler: None,
543 }
544 }
545
546 pub fn migrations_dir(mut self, path: impl Into<PathBuf>) -> Self {
552 self.migrations_dir = path.into();
553 self
554 }
555
556 pub fn migration(mut self, name: impl Into<String>, sql: impl Into<String>) -> Self {
561 self.extra_migrations.push(Migration::new(name, sql));
562 self
563 }
564
565 pub fn frontend_handler(&mut self, handler: FrontendHandler) {
570 self.frontend_handler = Some(handler);
571 }
572
573 pub fn config(mut self, config: ForgeConfig) -> Self {
575 self.config = Some(config);
576 self
577 }
578
579 pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
581 &mut self.function_registry
582 }
583
584 pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
586 &mut self.job_registry
587 }
588
589 pub fn cron_registry_mut(&mut self) -> &mut CronRegistry {
591 &mut self.cron_registry
592 }
593
594 pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
596 &mut self.workflow_registry
597 }
598
599 pub fn build(self) -> Result<Forge> {
601 let config = self
602 .config
603 .ok_or_else(|| ForgeError::Config("Configuration is required".to_string()))?;
604
605 let (shutdown_tx, _) = broadcast::channel(1);
606
607 Ok(Forge {
608 config,
609 db: None,
610 node_id: NodeId::new(),
611 function_registry: self.function_registry,
612 job_registry: self.job_registry,
613 cron_registry: Arc::new(self.cron_registry),
614 workflow_registry: self.workflow_registry,
615 shutdown_tx,
616 migrations_dir: self.migrations_dir,
617 extra_migrations: self.extra_migrations,
618 observability: None,
619 frontend_handler: self.frontend_handler,
620 })
621 }
622}
623
624impl Default for ForgeBuilder {
625 fn default() -> Self {
626 Self::new()
627 }
628}
629
630fn config_role_to_node_role(role: &ConfigNodeRole) -> NodeRole {
632 match role {
633 ConfigNodeRole::Gateway => NodeRole::Gateway,
634 ConfigNodeRole::Function => NodeRole::Function,
635 ConfigNodeRole::Worker => NodeRole::Worker,
636 ConfigNodeRole::Scheduler => NodeRole::Scheduler,
637 }
638}
639
640#[cfg(test)]
641mod tests {
642 use super::*;
643
644 #[test]
645 fn test_forge_builder_new() {
646 let builder = ForgeBuilder::new();
647 assert!(builder.config.is_none());
648 }
649
650 #[test]
651 fn test_forge_builder_requires_config() {
652 let builder = ForgeBuilder::new();
653 let result = builder.build();
654 assert!(result.is_err());
655 }
656
657 #[test]
658 fn test_forge_builder_with_config() {
659 let config = ForgeConfig::default_with_database_url("postgres://localhost/test");
660 let result = ForgeBuilder::new().config(config).build();
661 assert!(result.is_ok());
662 }
663
664 #[test]
665 fn test_config_role_conversion() {
666 assert_eq!(
667 config_role_to_node_role(&ConfigNodeRole::Gateway),
668 NodeRole::Gateway
669 );
670 assert_eq!(
671 config_role_to_node_role(&ConfigNodeRole::Worker),
672 NodeRole::Worker
673 );
674 assert_eq!(
675 config_role_to_node_role(&ConfigNodeRole::Scheduler),
676 NodeRole::Scheduler
677 );
678 assert_eq!(
679 config_role_to_node_role(&ConfigNodeRole::Function),
680 NodeRole::Function
681 );
682 }
683}