forge/
runtime.rs

1//! FORGE - The Rust Full-Stack Framework
2//!
3//! Single binary runtime that provides:
4//! - HTTP Gateway with RPC endpoints
5//! - WebSocket server for real-time subscriptions
6//! - Background job workers
7//! - Cron scheduler
8//! - Workflow engine
9//! - Observability dashboard
10//! - Cluster coordination
11
12use std::future::Future;
13use std::net::IpAddr;
14use std::path::PathBuf;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::time::Duration;
18
19use axum::body::Body;
20use axum::http::Request;
21use axum::response::Response;
22use tokio::sync::broadcast;
23
24use forge_core::cluster::{LeaderRole, NodeId, NodeInfo, NodeRole, NodeStatus};
25use forge_core::config::{ForgeConfig, NodeRole as ConfigNodeRole};
26use forge_core::error::{ForgeError, Result};
27use forge_runtime::migrations::{Migration, MigrationRunner, load_migrations_from_dir};
28
29use forge_runtime::cluster::{
30    GracefulShutdown, HeartbeatConfig, HeartbeatLoop, LeaderConfig, LeaderElection, NodeRegistry,
31    ShutdownConfig,
32};
33use forge_runtime::cron::{CronRegistry, CronRunner, CronRunnerConfig};
34use forge_runtime::dashboard::{
35    DashboardConfig, DashboardState, create_api_router, create_dashboard_router,
36};
37use forge_runtime::db::Database;
38use forge_runtime::function::FunctionRegistry;
39use forge_runtime::gateway::{AuthConfig, GatewayConfig as RuntimeGatewayConfig, GatewayServer};
40use forge_runtime::jobs::{JobDispatcher, JobQueue, JobRegistry, Worker, WorkerConfig};
41use forge_runtime::observability::{ObservabilityConfig, ObservabilityState};
42use forge_runtime::realtime::{WebSocketConfig, WebSocketServer};
43use forge_runtime::workflow::{
44    EventStore, WorkflowExecutor, WorkflowRegistry, WorkflowScheduler, WorkflowSchedulerConfig,
45};
46use tokio_util::sync::CancellationToken;
47
48/// Type alias for frontend handler function.
49pub type FrontendHandler = fn(Request<Body>) -> Pin<Box<dyn Future<Output = Response> + Send>>;
50
51/// Prelude module for common imports.
52pub mod prelude {
53    // Common types
54    pub use chrono::{DateTime, Utc};
55    pub use uuid::Uuid;
56
57    // Serde re-exports for user code
58    pub use serde::{Deserialize, Serialize};
59    pub use serde_json;
60
61    /// Timestamp type alias for convenience.
62    pub type Timestamp = DateTime<Utc>;
63
64    // Core types
65    pub use forge_core::cluster::NodeRole;
66    pub use forge_core::config::ForgeConfig;
67    pub use forge_core::cron::{CronContext, ForgeCron};
68    pub use forge_core::env::EnvAccess;
69    pub use forge_core::error::{ForgeError, Result};
70    pub use forge_core::function::{
71        ActionContext, AuthContext, ForgeMutation, ForgeQuery, MutationContext, QueryContext,
72    };
73    pub use forge_core::job::{ForgeJob, JobContext, JobPriority};
74    pub use forge_core::realtime::Delta;
75    pub use forge_core::schema::{FieldDef, ModelMeta, SchemaRegistry, TableDef};
76    pub use forge_core::workflow::{ForgeWorkflow, WorkflowContext};
77
78    pub use crate::{Forge, ForgeBuilder};
79}
80
81/// The main FORGE runtime.
82pub struct Forge {
83    config: ForgeConfig,
84    db: Option<Database>,
85    node_id: NodeId,
86    function_registry: FunctionRegistry,
87    job_registry: JobRegistry,
88    cron_registry: Arc<CronRegistry>,
89    workflow_registry: WorkflowRegistry,
90    shutdown_tx: broadcast::Sender<()>,
91    /// Path to user migrations directory (default: ./migrations).
92    migrations_dir: PathBuf,
93    /// Additional migrations provided programmatically.
94    extra_migrations: Vec<Migration>,
95    /// Observability state (created during run()).
96    observability: Option<ObservabilityState>,
97    /// Optional frontend handler for embedded SPA.
98    frontend_handler: Option<FrontendHandler>,
99}
100
101impl Forge {
102    /// Create a new builder for configuring FORGE.
103    pub fn builder() -> ForgeBuilder {
104        ForgeBuilder::new()
105    }
106
107    /// Get the node ID.
108    pub fn node_id(&self) -> NodeId {
109        self.node_id
110    }
111
112    /// Get the configuration.
113    pub fn config(&self) -> &ForgeConfig {
114        &self.config
115    }
116
117    /// Get the function registry.
118    pub fn function_registry(&self) -> &FunctionRegistry {
119        &self.function_registry
120    }
121
122    /// Get the function registry mutably.
123    pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
124        &mut self.function_registry
125    }
126
127    /// Get the job registry.
128    pub fn job_registry(&self) -> &JobRegistry {
129        &self.job_registry
130    }
131
132    /// Get the job registry mutably.
133    pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
134        &mut self.job_registry
135    }
136
137    /// Get the cron registry.
138    pub fn cron_registry(&self) -> Arc<CronRegistry> {
139        self.cron_registry.clone()
140    }
141
142    /// Get the workflow registry.
143    pub fn workflow_registry(&self) -> &WorkflowRegistry {
144        &self.workflow_registry
145    }
146
147    /// Get the workflow registry mutably.
148    pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
149        &mut self.workflow_registry
150    }
151
152    /// Get the observability state (available after run() starts).
153    pub fn observability(&self) -> Option<&ObservabilityState> {
154        self.observability.as_ref()
155    }
156
157    /// Run the FORGE server.
158    pub async fn run(mut self) -> Result<()> {
159        tracing::info!("FORGE runtime starting");
160
161        // Connect to database
162        let db = Database::from_config(&self.config.database).await?;
163        let pool = db.primary().clone();
164        self.db = Some(db);
165
166        tracing::info!("Connected to database");
167
168        // Run migrations with mesh-safe locking
169        // This acquires an advisory lock, so only one node runs migrations at a time
170        let runner = MigrationRunner::new(pool.clone());
171
172        // Load user migrations from directory + any programmatic ones
173        let mut user_migrations = load_migrations_from_dir(&self.migrations_dir)?;
174        user_migrations.extend(self.extra_migrations.clone());
175
176        runner.run(user_migrations).await?;
177        tracing::info!("Migrations completed");
178
179        // Create observability state
180        let obs_config = ObservabilityConfig::default();
181        let observability = ObservabilityState::new(obs_config, pool.clone());
182        self.observability = Some(observability.clone());
183
184        // Start observability background tasks
185        let obs_handles = observability.start_background_tasks();
186        tracing::info!(
187            "Observability collectors started ({} background tasks)",
188            obs_handles.len()
189        );
190
191        // Get local node info
192        let hostname = hostname::get()
193            .map(|h| h.to_string_lossy().to_string())
194            .unwrap_or_else(|_| "unknown".to_string());
195
196        let ip_address: IpAddr = "127.0.0.1".parse().unwrap();
197        let roles: Vec<NodeRole> = self
198            .config
199            .node
200            .roles
201            .iter()
202            .map(config_role_to_node_role)
203            .collect();
204
205        let node_info = NodeInfo::new_local(
206            hostname,
207            ip_address,
208            self.config.gateway.port,
209            self.config.gateway.grpc_port,
210            roles.clone(),
211            self.config.node.worker_capabilities.clone(),
212            env!("CARGO_PKG_VERSION").to_string(),
213        );
214
215        let node_id = node_info.id;
216        self.node_id = node_id;
217
218        // Create node registry
219        let node_registry = Arc::new(NodeRegistry::new(pool.clone(), node_info));
220
221        // Register node in cluster
222        if let Err(e) = node_registry.register().await {
223            tracing::warn!("Failed to register node (tables may not exist): {}", e);
224        }
225
226        // Set node status to active
227        if let Err(e) = node_registry.set_status(NodeStatus::Active).await {
228            tracing::warn!("Failed to set node status: {}", e);
229        }
230
231        // Create leader election for scheduler role
232        let leader_election = if roles.contains(&NodeRole::Scheduler) {
233            let election = Arc::new(LeaderElection::new(
234                pool.clone(),
235                node_id,
236                LeaderRole::Scheduler,
237                LeaderConfig::default(),
238            ));
239
240            // Try to become leader
241            if let Err(e) = election.try_become_leader().await {
242                tracing::warn!("Failed to acquire leadership: {}", e);
243            }
244
245            Some(election)
246        } else {
247            None
248        };
249
250        // Create graceful shutdown coordinator
251        let shutdown = Arc::new(GracefulShutdown::new(
252            node_registry.clone(),
253            leader_election.clone(),
254            ShutdownConfig::default(),
255        ));
256
257        // Create HTTP client for actions and crons
258        let http_client = reqwest::Client::new();
259
260        // Start background tasks based on roles
261        let mut handles = Vec::new();
262
263        // Start heartbeat loop
264        {
265            let heartbeat_pool = pool.clone();
266            let heartbeat_node_id = node_id;
267            let config = HeartbeatConfig::default();
268            handles.push(tokio::spawn(async move {
269                let heartbeat = HeartbeatLoop::new(heartbeat_pool, heartbeat_node_id, config);
270                heartbeat.run().await;
271            }));
272        }
273
274        // Start leader election loop if scheduler role
275        if let Some(ref election) = leader_election {
276            let election = election.clone();
277            handles.push(tokio::spawn(async move {
278                election.run().await;
279            }));
280        }
281
282        // Start job worker if worker role
283        if roles.contains(&NodeRole::Worker) {
284            let job_queue = JobQueue::new(pool.clone());
285            let worker_config = WorkerConfig {
286                id: Some(node_id.as_uuid()),
287                capabilities: self.config.node.worker_capabilities.clone(),
288                max_concurrent: self.config.worker.max_concurrent_jobs,
289                poll_interval: Duration::from_millis(self.config.worker.poll_interval_ms),
290                ..Default::default()
291            };
292
293            let mut worker = Worker::with_observability(
294                worker_config,
295                job_queue,
296                self.job_registry.clone(),
297                pool.clone(),
298                observability.clone(),
299            );
300
301            handles.push(tokio::spawn(async move {
302                if let Err(e) = worker.run().await {
303                    tracing::error!("Worker error: {}", e);
304                }
305            }));
306
307            tracing::info!("Job worker started");
308        }
309
310        // Start cron runner if scheduler role and is leader
311        if roles.contains(&NodeRole::Scheduler) {
312            let cron_registry = self.cron_registry.clone();
313            let cron_pool = pool.clone();
314            let cron_http = http_client.clone();
315            let is_leader = leader_election
316                .as_ref()
317                .map(|e| e.is_leader())
318                .unwrap_or(false);
319
320            let cron_config = CronRunnerConfig {
321                poll_interval: Duration::from_secs(1),
322                node_id: node_id.as_uuid(),
323                is_leader,
324            };
325
326            let cron_runner = CronRunner::with_observability(
327                cron_registry,
328                cron_pool,
329                cron_http,
330                cron_config,
331                observability.clone(),
332            );
333
334            handles.push(tokio::spawn(async move {
335                if let Err(e) = cron_runner.run().await {
336                    tracing::error!("Cron runner error: {}", e);
337                }
338            }));
339
340            tracing::info!("Cron scheduler started");
341        }
342
343        // Start workflow scheduler if scheduler role
344        let workflow_shutdown_token = CancellationToken::new();
345        if roles.contains(&NodeRole::Scheduler) {
346            let scheduler_executor = Arc::new(WorkflowExecutor::new(
347                Arc::new(self.workflow_registry.clone()),
348                pool.clone(),
349                http_client.clone(),
350            ));
351            let event_store = Arc::new(EventStore::new(pool.clone()));
352            let scheduler = WorkflowScheduler::new(
353                pool.clone(),
354                scheduler_executor,
355                event_store,
356                WorkflowSchedulerConfig::default(),
357            );
358
359            let shutdown_token = workflow_shutdown_token.clone();
360            handles.push(tokio::spawn(async move {
361                scheduler.run(shutdown_token).await;
362            }));
363
364            tracing::info!("Workflow scheduler started");
365        }
366
367        // Reactor handle for shutdown
368        let mut reactor_handle = None;
369
370        // Create job dispatcher and workflow executor for dispatch capabilities
371        let job_queue = JobQueue::new(pool.clone());
372        let job_dispatcher = Arc::new(JobDispatcher::new(job_queue, self.job_registry.clone()));
373        let workflow_executor = Arc::new(WorkflowExecutor::new(
374            Arc::new(self.workflow_registry.clone()),
375            pool.clone(),
376            http_client.clone(),
377        ));
378
379        // Start HTTP gateway if gateway role
380        if roles.contains(&NodeRole::Gateway) {
381            let gateway_config = RuntimeGatewayConfig {
382                port: self.config.gateway.port,
383                max_connections: self.config.gateway.max_connections,
384                request_timeout_secs: self.config.gateway.request_timeout_secs,
385                cors_enabled: true,
386                cors_origins: vec!["*".to_string()],
387                auth: AuthConfig::default(),
388            };
389
390            // Create dashboard state with registries and dispatchers
391            let dashboard_state = DashboardState {
392                pool: pool.clone(),
393                config: DashboardConfig::default(),
394                job_registry: self.job_registry.clone(),
395                cron_registry: self.cron_registry.clone(),
396                workflow_registry: self.workflow_registry.clone(),
397                job_dispatcher: Some(job_dispatcher.clone()),
398                workflow_executor: Some(workflow_executor.clone()),
399            };
400
401            // Build gateway router with dashboard and observability
402            let gateway = GatewayServer::with_observability(
403                gateway_config,
404                self.function_registry.clone(),
405                pool.clone(),
406                observability.clone(),
407            )
408            .with_job_dispatcher(job_dispatcher.clone())
409            .with_workflow_dispatcher(workflow_executor.clone());
410
411            // Start the reactor for real-time updates
412            let reactor = gateway.reactor();
413            if let Err(e) = reactor.start().await {
414                tracing::error!("Failed to start reactor: {}", e);
415            } else {
416                tracing::info!("Reactor started for real-time updates");
417                reactor_handle = Some(reactor);
418            }
419
420            let mut router = gateway.router();
421
422            // Mount dashboard at /_dashboard and API at /_api
423            router = router
424                .nest(
425                    "/_dashboard",
426                    create_dashboard_router(dashboard_state.clone()),
427                )
428                .nest("/_api", create_api_router(dashboard_state));
429
430            // Add frontend handler as fallback if configured
431            if let Some(handler) = self.frontend_handler {
432                use axum::routing::get;
433                router = router.fallback(get(handler));
434                tracing::info!("Frontend handler enabled - serving embedded assets");
435            }
436
437            let addr = gateway.addr();
438
439            handles.push(tokio::spawn(async move {
440                tracing::info!("Gateway server listening on {}", addr);
441                let listener = tokio::net::TcpListener::bind(addr)
442                    .await
443                    .expect("Failed to bind");
444                if let Err(e) = axum::serve(listener, router).await {
445                    tracing::error!("Gateway server error: {}", e);
446                }
447            }));
448
449            tracing::info!("HTTP gateway started on port {}", self.config.gateway.port);
450        }
451
452        // Start WebSocket server if gateway role
453        if roles.contains(&NodeRole::Gateway) {
454            let ws_config = WebSocketConfig::default();
455            let _ws_server = WebSocketServer::new(node_id, ws_config);
456
457            // WebSocket upgrade handling would be added to the gateway router
458            // For now, we just hold the server state
459            tracing::info!("WebSocket server initialized");
460        }
461
462        tracing::info!("FORGE runtime started successfully");
463        tracing::info!("  Node ID: {}", node_id);
464        tracing::info!("  Roles: {:?}", roles);
465
466        // Wait for shutdown signal
467        let mut shutdown_rx = self.shutdown_tx.subscribe();
468
469        tokio::select! {
470            _ = tokio::signal::ctrl_c() => {
471                tracing::info!("Received shutdown signal");
472            }
473            _ = shutdown_rx.recv() => {
474                tracing::info!("Received shutdown notification");
475            }
476        }
477
478        // Graceful shutdown
479        tracing::info!("Starting graceful shutdown...");
480
481        // Stop workflow scheduler
482        workflow_shutdown_token.cancel();
483        tracing::info!("Workflow scheduler stopped");
484
485        if let Err(e) = shutdown.shutdown().await {
486            tracing::warn!("Shutdown error: {}", e);
487        }
488
489        // Stop leader election
490        if let Some(ref election) = leader_election {
491            election.stop();
492        }
493
494        // Stop reactor before closing database
495        if let Some(ref reactor) = reactor_handle {
496            reactor.stop();
497            tracing::info!("Reactor stopped");
498        }
499
500        // Shutdown observability (final flush)
501        observability.shutdown().await;
502        tracing::info!("Observability shutdown complete");
503
504        // Close database connections
505        if let Some(ref db) = self.db {
506            db.close().await;
507        }
508
509        tracing::info!("FORGE runtime stopped");
510        Ok(())
511    }
512
513    /// Request shutdown.
514    pub fn shutdown(&self) {
515        let _ = self.shutdown_tx.send(());
516    }
517}
518
519/// Builder for configuring the FORGE runtime.
520pub struct ForgeBuilder {
521    config: Option<ForgeConfig>,
522    function_registry: FunctionRegistry,
523    job_registry: JobRegistry,
524    cron_registry: CronRegistry,
525    workflow_registry: WorkflowRegistry,
526    migrations_dir: PathBuf,
527    extra_migrations: Vec<Migration>,
528    frontend_handler: Option<FrontendHandler>,
529}
530
531impl ForgeBuilder {
532    /// Create a new builder.
533    pub fn new() -> Self {
534        Self {
535            config: None,
536            function_registry: FunctionRegistry::new(),
537            job_registry: JobRegistry::new(),
538            cron_registry: CronRegistry::new(),
539            workflow_registry: WorkflowRegistry::new(),
540            migrations_dir: PathBuf::from("migrations"),
541            extra_migrations: Vec::new(),
542            frontend_handler: None,
543        }
544    }
545
546    /// Set the directory to load migrations from.
547    ///
548    /// Defaults to `./migrations`. Migration files should be named like:
549    /// - `0001_create_users.sql`
550    /// - `0002_add_posts.sql`
551    pub fn migrations_dir(mut self, path: impl Into<PathBuf>) -> Self {
552        self.migrations_dir = path.into();
553        self
554    }
555
556    /// Add a migration programmatically.
557    ///
558    /// Use this for migrations that need to be generated at runtime,
559    /// or for testing. For most cases, use migration files instead.
560    pub fn migration(mut self, name: impl Into<String>, sql: impl Into<String>) -> Self {
561        self.extra_migrations.push(Migration::new(name, sql));
562        self
563    }
564
565    /// Set a frontend handler for serving embedded SPA assets.
566    ///
567    /// Use with the `embedded-frontend` feature to build a single binary
568    /// that includes both backend and frontend.
569    pub fn frontend_handler(&mut self, handler: FrontendHandler) {
570        self.frontend_handler = Some(handler);
571    }
572
573    /// Set the configuration.
574    pub fn config(mut self, config: ForgeConfig) -> Self {
575        self.config = Some(config);
576        self
577    }
578
579    /// Get mutable access to the function registry.
580    pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
581        &mut self.function_registry
582    }
583
584    /// Get mutable access to the job registry.
585    pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
586        &mut self.job_registry
587    }
588
589    /// Get mutable access to the cron registry.
590    pub fn cron_registry_mut(&mut self) -> &mut CronRegistry {
591        &mut self.cron_registry
592    }
593
594    /// Get mutable access to the workflow registry.
595    pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
596        &mut self.workflow_registry
597    }
598
599    /// Build the FORGE runtime.
600    pub fn build(self) -> Result<Forge> {
601        let config = self
602            .config
603            .ok_or_else(|| ForgeError::Config("Configuration is required".to_string()))?;
604
605        let (shutdown_tx, _) = broadcast::channel(1);
606
607        Ok(Forge {
608            config,
609            db: None,
610            node_id: NodeId::new(),
611            function_registry: self.function_registry,
612            job_registry: self.job_registry,
613            cron_registry: Arc::new(self.cron_registry),
614            workflow_registry: self.workflow_registry,
615            shutdown_tx,
616            migrations_dir: self.migrations_dir,
617            extra_migrations: self.extra_migrations,
618            observability: None,
619            frontend_handler: self.frontend_handler,
620        })
621    }
622}
623
624impl Default for ForgeBuilder {
625    fn default() -> Self {
626        Self::new()
627    }
628}
629
630/// Convert config NodeRole to cluster NodeRole.
631fn config_role_to_node_role(role: &ConfigNodeRole) -> NodeRole {
632    match role {
633        ConfigNodeRole::Gateway => NodeRole::Gateway,
634        ConfigNodeRole::Function => NodeRole::Function,
635        ConfigNodeRole::Worker => NodeRole::Worker,
636        ConfigNodeRole::Scheduler => NodeRole::Scheduler,
637    }
638}
639
640#[cfg(test)]
641mod tests {
642    use super::*;
643
644    #[test]
645    fn test_forge_builder_new() {
646        let builder = ForgeBuilder::new();
647        assert!(builder.config.is_none());
648    }
649
650    #[test]
651    fn test_forge_builder_requires_config() {
652        let builder = ForgeBuilder::new();
653        let result = builder.build();
654        assert!(result.is_err());
655    }
656
657    #[test]
658    fn test_forge_builder_with_config() {
659        let config = ForgeConfig::default_with_database_url("postgres://localhost/test");
660        let result = ForgeBuilder::new().config(config).build();
661        assert!(result.is_ok());
662    }
663
664    #[test]
665    fn test_config_role_conversion() {
666        assert_eq!(
667            config_role_to_node_role(&ConfigNodeRole::Gateway),
668            NodeRole::Gateway
669        );
670        assert_eq!(
671            config_role_to_node_role(&ConfigNodeRole::Worker),
672            NodeRole::Worker
673        );
674        assert_eq!(
675            config_role_to_node_role(&ConfigNodeRole::Scheduler),
676            NodeRole::Scheduler
677        );
678        assert_eq!(
679            config_role_to_node_role(&ConfigNodeRole::Function),
680            NodeRole::Function
681        );
682    }
683}