use std::future::Future;
use std::net::IpAddr;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use axum::body::Body;
use axum::http::Request;
use axum::response::Response;
use tokio::sync::broadcast;
use forge_core::cluster::{LeaderRole, NodeId, NodeInfo, NodeRole, NodeStatus};
use forge_core::config::{ForgeConfig, NodeRole as ConfigNodeRole};
use forge_core::error::{ForgeError, Result};
use forge_runtime::migrations::{load_migrations_from_dir, Migration, MigrationRunner};
use forge_runtime::cluster::{
GracefulShutdown, HeartbeatConfig, HeartbeatLoop, LeaderConfig, LeaderElection, NodeRegistry,
ShutdownConfig,
};
use forge_runtime::cron::{CronRegistry, CronRunner, CronRunnerConfig};
use forge_runtime::dashboard::{
create_api_router, create_dashboard_router, DashboardConfig, DashboardState,
};
use forge_runtime::db::Database;
use forge_runtime::function::FunctionRegistry;
use forge_runtime::gateway::{AuthConfig, GatewayConfig as RuntimeGatewayConfig, GatewayServer};
use forge_runtime::jobs::{JobDispatcher, JobQueue, JobRegistry, Worker, WorkerConfig};
use forge_runtime::observability::{ObservabilityConfig, ObservabilityState};
use forge_runtime::realtime::{WebSocketConfig, WebSocketServer};
use forge_runtime::workflow::{
EventStore, WorkflowExecutor, WorkflowRegistry, WorkflowScheduler, WorkflowSchedulerConfig,
};
use tokio_util::sync::CancellationToken;
pub type FrontendHandler = fn(Request<Body>) -> Pin<Box<dyn Future<Output = Response> + Send>>;
pub mod prelude {
pub use chrono::{DateTime, Utc};
pub use uuid::Uuid;
pub use serde::{Deserialize, Serialize};
pub use serde_json;
pub type Timestamp = DateTime<Utc>;
pub use forge_core::cluster::NodeRole;
pub use forge_core::config::ForgeConfig;
pub use forge_core::cron::{CronContext, ForgeCron};
pub use forge_core::error::{ForgeError, Result};
pub use forge_core::function::{
ActionContext, AuthContext, ForgeMutation, ForgeQuery, MutationContext, QueryContext,
};
pub use forge_core::job::{ForgeJob, JobContext, JobPriority};
pub use forge_core::realtime::Delta;
pub use forge_core::schema::{FieldDef, ModelMeta, SchemaRegistry, TableDef};
pub use forge_core::workflow::{ForgeWorkflow, WorkflowContext};
pub use crate::{Forge, ForgeBuilder};
#[cfg(feature = "testing")]
pub use forge_core::testing::{
assert_json_matches,
error_contains,
validation_error_for_field,
DispatchedJob,
IsolatedTestDb,
MockHttp,
MockHttpBuilder,
MockJobDispatch,
MockRequest,
MockResponse,
MockWorkflowDispatch,
StartedWorkflow,
TestActionContext,
TestActionContextBuilder,
TestCronContext,
TestCronContextBuilder,
TestDatabase,
TestJobContext,
TestJobContextBuilder,
TestMutationContext,
TestMutationContextBuilder,
TestQueryContext,
TestQueryContextBuilder,
TestWorkflowContext,
TestWorkflowContextBuilder,
};
#[cfg(feature = "testing")]
pub use forge_core::{
assert_err, assert_err_variant, assert_http_called, assert_http_not_called,
assert_job_dispatched, assert_job_not_dispatched, assert_ok, assert_workflow_not_started,
assert_workflow_started,
};
}
pub struct Forge {
config: ForgeConfig,
db: Option<Database>,
node_id: NodeId,
function_registry: FunctionRegistry,
job_registry: JobRegistry,
cron_registry: Arc<CronRegistry>,
workflow_registry: WorkflowRegistry,
shutdown_tx: broadcast::Sender<()>,
migrations_dir: PathBuf,
extra_migrations: Vec<Migration>,
observability: Option<ObservabilityState>,
frontend_handler: Option<FrontendHandler>,
}
impl Forge {
pub fn builder() -> ForgeBuilder {
ForgeBuilder::new()
}
pub fn node_id(&self) -> NodeId {
self.node_id
}
pub fn config(&self) -> &ForgeConfig {
&self.config
}
pub fn function_registry(&self) -> &FunctionRegistry {
&self.function_registry
}
pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
&mut self.function_registry
}
pub fn job_registry(&self) -> &JobRegistry {
&self.job_registry
}
pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
&mut self.job_registry
}
pub fn cron_registry(&self) -> Arc<CronRegistry> {
self.cron_registry.clone()
}
pub fn workflow_registry(&self) -> &WorkflowRegistry {
&self.workflow_registry
}
pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
&mut self.workflow_registry
}
pub fn observability(&self) -> Option<&ObservabilityState> {
self.observability.as_ref()
}
pub async fn run(mut self) -> Result<()> {
tracing::info!("FORGE runtime starting");
let db = Database::from_config(&self.config.database).await?;
let pool = db.primary().clone();
self.db = Some(db);
tracing::info!("Connected to database");
let runner = MigrationRunner::new(pool.clone());
let mut user_migrations = load_migrations_from_dir(&self.migrations_dir)?;
user_migrations.extend(self.extra_migrations.clone());
runner.run(user_migrations).await?;
tracing::info!("Migrations completed");
let obs_config = ObservabilityConfig::default();
let observability = ObservabilityState::new(obs_config, pool.clone());
self.observability = Some(observability.clone());
let obs_handles = observability.start_background_tasks();
tracing::info!(
"Observability collectors started ({} background tasks)",
obs_handles.len()
);
let hostname = hostname::get()
.map(|h| h.to_string_lossy().to_string())
.unwrap_or_else(|_| "unknown".to_string());
let ip_address: IpAddr = "127.0.0.1".parse().unwrap();
let roles: Vec<NodeRole> = self
.config
.node
.roles
.iter()
.map(config_role_to_node_role)
.collect();
let node_info = NodeInfo::new_local(
hostname,
ip_address,
self.config.gateway.port,
self.config.gateway.grpc_port,
roles.clone(),
self.config.node.worker_capabilities.clone(),
env!("CARGO_PKG_VERSION").to_string(),
);
let node_id = node_info.id;
self.node_id = node_id;
let node_registry = Arc::new(NodeRegistry::new(pool.clone(), node_info));
if let Err(e) = node_registry.register().await {
tracing::warn!("Failed to register node (tables may not exist): {}", e);
}
if let Err(e) = node_registry.set_status(NodeStatus::Active).await {
tracing::warn!("Failed to set node status: {}", e);
}
let leader_election = if roles.contains(&NodeRole::Scheduler) {
let election = Arc::new(LeaderElection::new(
pool.clone(),
node_id,
LeaderRole::Scheduler,
LeaderConfig::default(),
));
if let Err(e) = election.try_become_leader().await {
tracing::warn!("Failed to acquire leadership: {}", e);
}
Some(election)
} else {
None
};
let shutdown = Arc::new(GracefulShutdown::new(
node_registry.clone(),
leader_election.clone(),
ShutdownConfig::default(),
));
let http_client = reqwest::Client::new();
let mut handles = Vec::new();
{
let heartbeat_pool = pool.clone();
let heartbeat_node_id = node_id;
let config = HeartbeatConfig::default();
handles.push(tokio::spawn(async move {
let heartbeat = HeartbeatLoop::new(heartbeat_pool, heartbeat_node_id, config);
heartbeat.run().await;
}));
}
if let Some(ref election) = leader_election {
let election = election.clone();
handles.push(tokio::spawn(async move {
election.run().await;
}));
}
if roles.contains(&NodeRole::Worker) {
let job_queue = JobQueue::new(pool.clone());
let worker_config = WorkerConfig {
id: Some(node_id.as_uuid()),
capabilities: self.config.node.worker_capabilities.clone(),
max_concurrent: self.config.worker.max_concurrent_jobs,
poll_interval: Duration::from_millis(self.config.worker.poll_interval_ms),
..Default::default()
};
let mut worker = Worker::with_observability(
worker_config,
job_queue,
self.job_registry.clone(),
pool.clone(),
observability.clone(),
);
handles.push(tokio::spawn(async move {
if let Err(e) = worker.run().await {
tracing::error!("Worker error: {}", e);
}
}));
tracing::info!("Job worker started");
}
if roles.contains(&NodeRole::Scheduler) {
let cron_registry = self.cron_registry.clone();
let cron_pool = pool.clone();
let cron_http = http_client.clone();
let is_leader = leader_election
.as_ref()
.map(|e| e.is_leader())
.unwrap_or(false);
let cron_config = CronRunnerConfig {
poll_interval: Duration::from_secs(1),
node_id: node_id.as_uuid(),
is_leader,
};
let cron_runner = CronRunner::with_observability(
cron_registry,
cron_pool,
cron_http,
cron_config,
observability.clone(),
);
handles.push(tokio::spawn(async move {
if let Err(e) = cron_runner.run().await {
tracing::error!("Cron runner error: {}", e);
}
}));
tracing::info!("Cron scheduler started");
}
let workflow_shutdown_token = CancellationToken::new();
if roles.contains(&NodeRole::Scheduler) {
let scheduler_executor = Arc::new(WorkflowExecutor::new(
Arc::new(self.workflow_registry.clone()),
pool.clone(),
http_client.clone(),
));
let event_store = Arc::new(EventStore::new(pool.clone()));
let scheduler = WorkflowScheduler::new(
pool.clone(),
scheduler_executor,
event_store,
WorkflowSchedulerConfig::default(),
);
let shutdown_token = workflow_shutdown_token.clone();
handles.push(tokio::spawn(async move {
scheduler.run(shutdown_token).await;
}));
tracing::info!("Workflow scheduler started");
}
let mut reactor_handle = None;
let job_queue = JobQueue::new(pool.clone());
let job_dispatcher = Arc::new(JobDispatcher::new(job_queue, self.job_registry.clone()));
let workflow_executor = Arc::new(WorkflowExecutor::new(
Arc::new(self.workflow_registry.clone()),
pool.clone(),
http_client.clone(),
));
if roles.contains(&NodeRole::Gateway) {
let gateway_config = RuntimeGatewayConfig {
port: self.config.gateway.port,
max_connections: self.config.gateway.max_connections,
request_timeout_secs: self.config.gateway.request_timeout_secs,
cors_enabled: true,
cors_origins: vec!["*".to_string()],
auth: AuthConfig::default(),
};
let dashboard_state = DashboardState {
pool: pool.clone(),
config: DashboardConfig::default(),
job_registry: self.job_registry.clone(),
cron_registry: self.cron_registry.clone(),
workflow_registry: self.workflow_registry.clone(),
job_dispatcher: Some(job_dispatcher.clone()),
workflow_executor: Some(workflow_executor.clone()),
};
let gateway = GatewayServer::with_observability(
gateway_config,
self.function_registry.clone(),
pool.clone(),
observability.clone(),
)
.with_job_dispatcher(job_dispatcher.clone())
.with_workflow_dispatcher(workflow_executor.clone());
let reactor = gateway.reactor();
if let Err(e) = reactor.start().await {
tracing::error!("Failed to start reactor: {}", e);
} else {
tracing::info!("Reactor started for real-time updates");
reactor_handle = Some(reactor);
}
let mut router = gateway.router();
router = router
.nest(
"/_dashboard",
create_dashboard_router(dashboard_state.clone()),
)
.nest("/_api", create_api_router(dashboard_state));
if let Some(handler) = self.frontend_handler {
use axum::routing::get;
router = router.fallback(get(handler));
tracing::info!("Frontend handler enabled - serving embedded assets");
}
let addr = gateway.addr();
handles.push(tokio::spawn(async move {
tracing::info!("Gateway server listening on {}", addr);
let listener = tokio::net::TcpListener::bind(addr)
.await
.expect("Failed to bind");
if let Err(e) = axum::serve(listener, router).await {
tracing::error!("Gateway server error: {}", e);
}
}));
tracing::info!("HTTP gateway started on port {}", self.config.gateway.port);
}
if roles.contains(&NodeRole::Gateway) {
let ws_config = WebSocketConfig::default();
let _ws_server = WebSocketServer::new(node_id, ws_config);
tracing::info!("WebSocket server initialized");
}
tracing::info!("FORGE runtime started successfully");
tracing::info!(" Node ID: {}", node_id);
tracing::info!(" Roles: {:?}", roles);
let mut shutdown_rx = self.shutdown_tx.subscribe();
tokio::select! {
_ = tokio::signal::ctrl_c() => {
tracing::info!("Received shutdown signal");
}
_ = shutdown_rx.recv() => {
tracing::info!("Received shutdown notification");
}
}
tracing::info!("Starting graceful shutdown...");
workflow_shutdown_token.cancel();
tracing::info!("Workflow scheduler stopped");
if let Err(e) = shutdown.shutdown().await {
tracing::warn!("Shutdown error: {}", e);
}
if let Some(ref election) = leader_election {
election.stop();
}
if let Some(ref reactor) = reactor_handle {
reactor.stop();
tracing::info!("Reactor stopped");
}
observability.shutdown().await;
tracing::info!("Observability shutdown complete");
if let Some(ref db) = self.db {
db.close().await;
}
tracing::info!("FORGE runtime stopped");
Ok(())
}
pub fn shutdown(&self) {
let _ = self.shutdown_tx.send(());
}
}
pub struct ForgeBuilder {
config: Option<ForgeConfig>,
function_registry: FunctionRegistry,
job_registry: JobRegistry,
cron_registry: CronRegistry,
workflow_registry: WorkflowRegistry,
migrations_dir: PathBuf,
extra_migrations: Vec<Migration>,
frontend_handler: Option<FrontendHandler>,
}
impl ForgeBuilder {
pub fn new() -> Self {
Self {
config: None,
function_registry: FunctionRegistry::new(),
job_registry: JobRegistry::new(),
cron_registry: CronRegistry::new(),
workflow_registry: WorkflowRegistry::new(),
migrations_dir: PathBuf::from("migrations"),
extra_migrations: Vec::new(),
frontend_handler: None,
}
}
pub fn migrations_dir(mut self, path: impl Into<PathBuf>) -> Self {
self.migrations_dir = path.into();
self
}
pub fn migration(mut self, name: impl Into<String>, sql: impl Into<String>) -> Self {
self.extra_migrations.push(Migration::new(name, sql));
self
}
pub fn frontend_handler(&mut self, handler: FrontendHandler) {
self.frontend_handler = Some(handler);
}
pub fn config(mut self, config: ForgeConfig) -> Self {
self.config = Some(config);
self
}
pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
&mut self.function_registry
}
pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
&mut self.job_registry
}
pub fn cron_registry_mut(&mut self) -> &mut CronRegistry {
&mut self.cron_registry
}
pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
&mut self.workflow_registry
}
pub fn build(self) -> Result<Forge> {
let config = self
.config
.ok_or_else(|| ForgeError::Config("Configuration is required".to_string()))?;
let (shutdown_tx, _) = broadcast::channel(1);
Ok(Forge {
config,
db: None,
node_id: NodeId::new(),
function_registry: self.function_registry,
job_registry: self.job_registry,
cron_registry: Arc::new(self.cron_registry),
workflow_registry: self.workflow_registry,
shutdown_tx,
migrations_dir: self.migrations_dir,
extra_migrations: self.extra_migrations,
observability: None,
frontend_handler: self.frontend_handler,
})
}
}
impl Default for ForgeBuilder {
fn default() -> Self {
Self::new()
}
}
fn config_role_to_node_role(role: &ConfigNodeRole) -> NodeRole {
match role {
ConfigNodeRole::Gateway => NodeRole::Gateway,
ConfigNodeRole::Function => NodeRole::Function,
ConfigNodeRole::Worker => NodeRole::Worker,
ConfigNodeRole::Scheduler => NodeRole::Scheduler,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_forge_builder_new() {
let builder = ForgeBuilder::new();
assert!(builder.config.is_none());
}
#[test]
fn test_forge_builder_requires_config() {
let builder = ForgeBuilder::new();
let result = builder.build();
assert!(result.is_err());
}
#[test]
fn test_forge_builder_with_config() {
let config = ForgeConfig::default_with_database_url("postgres://localhost/test");
let result = ForgeBuilder::new().config(config).build();
assert!(result.is_ok());
}
#[test]
fn test_config_role_conversion() {
assert_eq!(
config_role_to_node_role(&ConfigNodeRole::Gateway),
NodeRole::Gateway
);
assert_eq!(
config_role_to_node_role(&ConfigNodeRole::Worker),
NodeRole::Worker
);
assert_eq!(
config_role_to_node_role(&ConfigNodeRole::Scheduler),
NodeRole::Scheduler
);
assert_eq!(
config_role_to_node_role(&ConfigNodeRole::Function),
NodeRole::Function
);
}
}