1use std::future::Future;
12use std::net::IpAddr;
13use std::path::PathBuf;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::Duration;
17
18use axum::Router;
19use axum::body::Body;
20use axum::http::Request;
21use axum::response::Response;
22use tokio::sync::broadcast;
23
24use forge_core::CircuitBreakerClient;
25use forge_core::cluster::{LeaderRole, NodeId, NodeInfo, NodeRole, NodeStatus};
26use forge_core::config::{ForgeConfig, NodeRole as ConfigNodeRole};
27use forge_core::error::{ForgeError, Result};
28use forge_core::function::{ForgeMutation, ForgeQuery};
29use forge_core::mcp::ForgeMcpTool;
30use forge_runtime::migrations::{Migration, MigrationRunner, load_migrations_from_dir};
31
32use forge_runtime::cluster::{
33 GracefulShutdown, HeartbeatConfig, HeartbeatLoop, LeaderConfig, LeaderElection, NodeRegistry,
34 ShutdownConfig,
35};
36use forge_runtime::cron::{CronRegistry, CronRunner, CronRunnerConfig};
37use forge_runtime::daemon::{DaemonRegistry, DaemonRunner};
38use forge_runtime::db::Database;
39use forge_runtime::function::FunctionRegistry;
40use forge_runtime::gateway::{AuthConfig, GatewayConfig as RuntimeGatewayConfig, GatewayServer};
41use forge_runtime::jobs::{JobDispatcher, JobQueue, JobRegistry, Worker, WorkerConfig};
42use forge_runtime::mcp::McpToolRegistry;
43use forge_runtime::webhook::{WebhookRegistry, WebhookState, webhook_handler};
44use forge_runtime::workflow::{
45 EventStore, WorkflowExecutor, WorkflowRegistry, WorkflowScheduler, WorkflowSchedulerConfig,
46};
47use tokio_util::sync::CancellationToken;
48
49pub type FrontendHandler = fn(Request<Body>) -> Pin<Box<dyn Future<Output = Response> + Send>>;
51
52pub mod prelude {
54 pub use chrono::{DateTime, Utc};
56 pub use uuid::Uuid;
57
58 pub use serde::{Deserialize, Serialize};
60 pub use serde_json;
61
62 pub type Timestamp = DateTime<Utc>;
64
65 pub use forge_core::cluster::NodeRole;
67 pub use forge_core::config::ForgeConfig;
68 pub use forge_core::cron::{CronContext, ForgeCron};
69 pub use forge_core::daemon::{DaemonContext, ForgeDaemon};
70 pub use forge_core::env::EnvAccess;
71 pub use forge_core::error::{ForgeError, Result};
72 pub use forge_core::function::{
73 AuthContext, ForgeMutation, ForgeQuery, MutationContext, QueryContext,
74 };
75 pub use forge_core::job::{ForgeJob, JobContext, JobPriority};
76 pub use forge_core::mcp::{ForgeMcpTool, McpToolContext, McpToolResult};
77 pub use forge_core::realtime::Delta;
78 pub use forge_core::schema::{FieldDef, ModelMeta, SchemaRegistry, TableDef};
79 pub use forge_core::schemars::JsonSchema;
80 pub use forge_core::types::Upload;
81 pub use forge_core::webhook::{ForgeWebhook, WebhookContext, WebhookResult, WebhookSignature};
82 pub use forge_core::workflow::{ForgeWorkflow, WorkflowContext};
83
84 pub use axum;
86
87 pub use crate::{Forge, ForgeBuilder};
88}
89
90pub struct Forge {
92 config: ForgeConfig,
93 db: Option<Database>,
94 node_id: NodeId,
95 function_registry: FunctionRegistry,
96 mcp_registry: McpToolRegistry,
97 job_registry: JobRegistry,
98 cron_registry: Arc<CronRegistry>,
99 workflow_registry: WorkflowRegistry,
100 daemon_registry: Arc<DaemonRegistry>,
101 webhook_registry: Arc<WebhookRegistry>,
102 shutdown_tx: broadcast::Sender<()>,
103 migrations_dir: PathBuf,
105 extra_migrations: Vec<Migration>,
107 frontend_handler: Option<FrontendHandler>,
109 custom_routes: Option<Router>,
111}
112
113impl Forge {
114 pub fn builder() -> ForgeBuilder {
116 ForgeBuilder::new()
117 }
118
119 pub fn node_id(&self) -> NodeId {
121 self.node_id
122 }
123
124 pub fn config(&self) -> &ForgeConfig {
126 &self.config
127 }
128
129 pub fn function_registry(&self) -> &FunctionRegistry {
131 &self.function_registry
132 }
133
134 pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
136 &mut self.function_registry
137 }
138
139 pub fn mcp_registry_mut(&mut self) -> &mut McpToolRegistry {
141 &mut self.mcp_registry
142 }
143
144 pub fn register_mcp_tool<T: ForgeMcpTool>(&mut self) -> &mut Self {
146 self.mcp_registry.register::<T>();
147 self
148 }
149
150 pub fn job_registry(&self) -> &JobRegistry {
152 &self.job_registry
153 }
154
155 pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
157 &mut self.job_registry
158 }
159
160 pub fn cron_registry(&self) -> Arc<CronRegistry> {
162 self.cron_registry.clone()
163 }
164
165 pub fn workflow_registry(&self) -> &WorkflowRegistry {
167 &self.workflow_registry
168 }
169
170 pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
172 &mut self.workflow_registry
173 }
174
175 pub fn daemon_registry(&self) -> Arc<DaemonRegistry> {
177 self.daemon_registry.clone()
178 }
179
180 pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
182 self.webhook_registry.clone()
183 }
184
185 pub async fn run(mut self) -> Result<()> {
187 let telemetry_config = forge_runtime::TelemetryConfig::from_observability_config(
189 &self.config.observability,
190 &self.config.project.name,
191 &self.config.project.version,
192 );
193 match forge_runtime::init_telemetry(
194 &telemetry_config,
195 &self.config.project.name,
196 &self.config.observability.log_level,
197 ) {
198 Ok(true) => {}
199 Ok(false) => {
200 }
202 Err(e) => {
203 eprintln!("forge: failed to initialize telemetry: {e}");
204 }
205 }
206
207 tracing::debug!("Connecting to database");
208
209 let db = Database::from_config(&self.config.database).await?;
211 let pool = db.primary().clone();
212 self.db = Some(db);
213
214 tracing::debug!("Database connected");
215
216 let runner = MigrationRunner::new(pool.clone());
219
220 let mut user_migrations = load_migrations_from_dir(&self.migrations_dir)?;
222 user_migrations.extend(self.extra_migrations.clone());
223
224 runner.run(user_migrations).await?;
225 tracing::debug!("Migrations applied");
226
227 let hostname = hostname::get()
229 .map(|h| h.to_string_lossy().to_string())
230 .unwrap_or_else(|_| "unknown".to_string());
231
232 let ip_address: IpAddr = "127.0.0.1".parse().expect("valid IP literal");
233 let roles: Vec<NodeRole> = self
234 .config
235 .node
236 .roles
237 .iter()
238 .map(config_role_to_node_role)
239 .collect();
240
241 let node_info = NodeInfo::new_local(
242 hostname,
243 ip_address,
244 self.config.gateway.port,
245 self.config.gateway.grpc_port,
246 roles.clone(),
247 self.config.node.worker_capabilities.clone(),
248 env!("CARGO_PKG_VERSION").to_string(),
249 );
250
251 let node_id = node_info.id;
252 self.node_id = node_id;
253
254 let node_registry = Arc::new(NodeRegistry::new(pool.clone(), node_info));
256
257 if let Err(e) = node_registry.register().await {
259 tracing::debug!("Failed to register node (tables may not exist): {}", e);
260 }
261
262 if let Err(e) = node_registry.set_status(NodeStatus::Active).await {
264 tracing::debug!("Failed to set node status: {}", e);
265 }
266
267 let leader_election = if roles.contains(&NodeRole::Scheduler) {
269 let election = Arc::new(LeaderElection::new(
270 pool.clone(),
271 node_id,
272 LeaderRole::Scheduler,
273 LeaderConfig::default(),
274 ));
275
276 if let Err(e) = election.try_become_leader().await {
278 tracing::debug!("Failed to acquire leadership: {}", e);
279 }
280
281 Some(election)
282 } else {
283 None
284 };
285
286 let shutdown = Arc::new(GracefulShutdown::new(
288 node_registry.clone(),
289 leader_election.clone(),
290 ShutdownConfig::default(),
291 ));
292
293 let http_client = CircuitBreakerClient::with_defaults(reqwest::Client::new());
295
296 let mut handles = Vec::new();
298
299 {
301 let heartbeat_pool = pool.clone();
302 let heartbeat_node_id = node_id;
303 let config = HeartbeatConfig::default();
304 handles.push(tokio::spawn(async move {
305 let heartbeat = HeartbeatLoop::new(heartbeat_pool, heartbeat_node_id, config);
306 heartbeat.run().await;
307 }));
308 }
309
310 if let Some(ref election) = leader_election {
312 let election = election.clone();
313 handles.push(tokio::spawn(async move {
314 election.run().await;
315 }));
316 }
317
318 if roles.contains(&NodeRole::Worker) {
320 let job_queue = JobQueue::new(pool.clone());
321 let worker_config = WorkerConfig {
322 id: Some(node_id.as_uuid()),
323 capabilities: self.config.node.worker_capabilities.clone(),
324 max_concurrent: self.config.worker.max_concurrent_jobs,
325 poll_interval: Duration::from_millis(self.config.worker.poll_interval_ms),
326 ..Default::default()
327 };
328
329 let mut worker = Worker::new(
330 worker_config,
331 job_queue,
332 self.job_registry.clone(),
333 pool.clone(),
334 );
335
336 handles.push(tokio::spawn(async move {
337 if let Err(e) = worker.run().await {
338 tracing::error!("Worker error: {}", e);
339 }
340 }));
341
342 tracing::debug!("Job worker started");
343 }
344
345 if roles.contains(&NodeRole::Scheduler) {
347 let cron_registry = self.cron_registry.clone();
348 let cron_pool = pool.clone();
349 let cron_http = http_client.clone();
350 let cron_leader_election = leader_election.clone();
351
352 let cron_config = CronRunnerConfig {
353 poll_interval: Duration::from_secs(1),
354 node_id: node_id.as_uuid(),
355 is_leader: cron_leader_election.is_none(),
356 leader_election: cron_leader_election,
357 run_stale_threshold: Duration::from_secs(15 * 60),
358 };
359
360 let cron_runner = CronRunner::new(cron_registry, cron_pool, cron_http, cron_config);
361
362 handles.push(tokio::spawn(async move {
363 if let Err(e) = cron_runner.run().await {
364 tracing::error!("Cron runner error: {}", e);
365 }
366 }));
367
368 tracing::debug!("Cron scheduler started");
369 }
370
371 let workflow_shutdown_token = CancellationToken::new();
373 if roles.contains(&NodeRole::Scheduler) {
374 let scheduler_executor = Arc::new(WorkflowExecutor::new(
375 Arc::new(self.workflow_registry.clone()),
376 pool.clone(),
377 http_client.clone(),
378 ));
379 let event_store = Arc::new(EventStore::new(pool.clone()));
380 let scheduler = WorkflowScheduler::new(
381 pool.clone(),
382 scheduler_executor,
383 event_store,
384 WorkflowSchedulerConfig::default(),
385 );
386
387 let shutdown_token = workflow_shutdown_token.clone();
388 handles.push(tokio::spawn(async move {
389 scheduler.run(shutdown_token).await;
390 }));
391
392 tracing::debug!("Workflow scheduler started");
393 }
394
395 let job_queue_for_dispatch = JobQueue::new(pool.clone());
397 let job_dispatcher = Arc::new(JobDispatcher::new(
398 job_queue_for_dispatch,
399 self.job_registry.clone(),
400 ));
401 let workflow_executor = Arc::new(WorkflowExecutor::new(
402 Arc::new(self.workflow_registry.clone()),
403 pool.clone(),
404 http_client.clone(),
405 ));
406
407 if roles.contains(&NodeRole::Scheduler) && !self.daemon_registry.is_empty() {
409 let daemon_registry = self.daemon_registry.clone();
410 let daemon_pool = pool.clone();
411 let daemon_http = http_client.clone();
412 let daemon_shutdown_rx = self.shutdown_tx.subscribe();
413
414 let daemon_runner = DaemonRunner::new(
415 daemon_registry,
416 daemon_pool,
417 daemon_http,
418 node_id.as_uuid(),
419 daemon_shutdown_rx,
420 )
421 .with_job_dispatch(job_dispatcher.clone())
422 .with_workflow_dispatch(workflow_executor.clone());
423
424 handles.push(tokio::spawn(async move {
425 if let Err(e) = daemon_runner.run().await {
426 tracing::error!("Daemon runner error: {}", e);
427 }
428 }));
429
430 tracing::debug!("Daemon runner started");
431 }
432
433 let mut reactor_handle = None;
435
436 let gateway_port = std::env::var("PORT")
438 .ok()
439 .and_then(|p| p.parse::<u16>().ok())
440 .unwrap_or(self.config.gateway.port);
441
442 if roles.contains(&NodeRole::Gateway) {
444 let gateway_config = RuntimeGatewayConfig {
445 port: gateway_port,
446 max_connections: self.config.gateway.max_connections,
447 request_timeout_secs: self.config.gateway.request_timeout_secs,
448 cors_enabled: self.config.gateway.cors_enabled
449 || !self.config.gateway.cors_origins.is_empty(),
450 cors_origins: self.config.gateway.cors_origins.clone(),
451 auth: AuthConfig::from_forge_config(&self.config.auth)
452 .map_err(|e| ForgeError::Config(e.to_string()))?,
453 mcp: self.config.mcp.clone(),
454 };
455
456 let gateway = GatewayServer::new(
458 gateway_config,
459 self.function_registry.clone(),
460 self.db.clone().expect("Database must be initialized"),
461 )
462 .with_job_dispatcher(job_dispatcher.clone())
463 .with_workflow_dispatcher(workflow_executor.clone())
464 .with_mcp_registry(self.mcp_registry.clone());
465
466 let reactor = gateway.reactor();
468 if let Err(e) = reactor.start().await {
469 tracing::error!("Failed to start reactor: {}", e);
470 } else {
471 tracing::debug!("Reactor started");
472 reactor_handle = Some(reactor);
473 }
474
475 let api_router = gateway.router();
477
478 let mut router = Router::new().nest("/_api", api_router);
480
481 if !self.webhook_registry.is_empty() {
483 use axum::routing::post;
484 use tower_http::cors::{Any, CorsLayer};
485
486 let webhook_state = Arc::new(
487 WebhookState::new(self.webhook_registry.clone(), pool.clone())
488 .with_job_dispatcher(job_dispatcher.clone()),
489 );
490
491 let webhook_cors = if self.config.gateway.cors_enabled
494 || !self.config.gateway.cors_origins.is_empty()
495 {
496 if self.config.gateway.cors_origins.iter().any(|o| o == "*") {
497 CorsLayer::new()
498 .allow_origin(Any)
499 .allow_methods(Any)
500 .allow_headers(Any)
501 } else {
502 let origins: Vec<_> = self
503 .config
504 .gateway
505 .cors_origins
506 .iter()
507 .filter_map(|o| o.parse().ok())
508 .collect();
509 CorsLayer::new()
510 .allow_origin(origins)
511 .allow_methods(Any)
512 .allow_headers(Any)
513 }
514 } else {
515 CorsLayer::new()
516 };
517
518 let webhook_router = Router::new()
519 .route("/{*path}", post(webhook_handler).with_state(webhook_state))
520 .layer(axum::extract::DefaultBodyLimit::max(1024 * 1024))
521 .layer(
522 tower::ServiceBuilder::new()
523 .layer(axum::error_handling::HandleErrorLayer::new(
524 |err: tower::BoxError| async move {
525 if err.is::<tower::timeout::error::Elapsed>() {
526 return (
527 axum::http::StatusCode::REQUEST_TIMEOUT,
528 "Request timed out",
529 );
530 }
531 (
532 axum::http::StatusCode::SERVICE_UNAVAILABLE,
533 "Server overloaded",
534 )
535 },
536 ))
537 .layer(tower::limit::ConcurrencyLimitLayer::new(
538 self.config.gateway.max_connections,
539 ))
540 .layer(tower::timeout::TimeoutLayer::new(Duration::from_secs(
541 self.config.gateway.request_timeout_secs,
542 ))),
543 )
544 .layer(webhook_cors);
545
546 router = router.nest("/_api/webhooks", webhook_router);
547
548 tracing::debug!(
549 webhooks = ?self.webhook_registry.paths().collect::<Vec<_>>(),
550 "Webhook routes registered"
551 );
552 }
553
554 if let Some(custom) = self.custom_routes.take() {
556 router = router.merge(custom);
557 tracing::debug!("Custom routes merged");
558 }
559
560 if let Some(handler) = self.frontend_handler {
562 use axum::routing::get;
563 router = router.fallback(get(handler));
564 tracing::debug!("Frontend handler enabled");
565 }
566
567 let addr = gateway.addr();
568
569 handles.push(tokio::spawn(async move {
570 tracing::debug!(addr = %addr, "Gateway server binding");
571 let listener = tokio::net::TcpListener::bind(addr)
572 .await
573 .expect("Failed to bind");
574 if let Err(e) = axum::serve(listener, router).await {
575 tracing::error!("Gateway server error: {}", e);
576 }
577 }));
578 }
579
580 tracing::info!(
581 node_id = %node_id,
582 roles = ?roles,
583 port = gateway_port,
584 "Forge started"
585 );
586
587 let mut shutdown_rx = self.shutdown_tx.subscribe();
589
590 tokio::select! {
591 _ = tokio::signal::ctrl_c() => {
592 tracing::debug!("Received ctrl-c");
593 }
594 _ = shutdown_rx.recv() => {
595 tracing::debug!("Received shutdown notification");
596 }
597 }
598
599 tracing::debug!("Graceful shutdown starting");
601
602 workflow_shutdown_token.cancel();
604
605 if let Err(e) = shutdown.shutdown().await {
606 tracing::warn!(error = %e, "Shutdown error");
607 }
608
609 if let Some(ref election) = leader_election {
611 election.stop();
612 }
613
614 if let Some(ref reactor) = reactor_handle {
616 reactor.stop();
617 }
618
619 if let Some(ref db) = self.db {
621 db.close().await;
622 }
623
624 forge_runtime::shutdown_telemetry();
625 tracing::info!("Forge stopped");
626 Ok(())
627 }
628
629 pub fn shutdown(&self) {
631 let _ = self.shutdown_tx.send(());
632 }
633}
634
635pub struct ForgeBuilder {
637 config: Option<ForgeConfig>,
638 function_registry: FunctionRegistry,
639 mcp_registry: McpToolRegistry,
640 job_registry: JobRegistry,
641 cron_registry: CronRegistry,
642 workflow_registry: WorkflowRegistry,
643 daemon_registry: DaemonRegistry,
644 webhook_registry: WebhookRegistry,
645 migrations_dir: PathBuf,
646 extra_migrations: Vec<Migration>,
647 frontend_handler: Option<FrontendHandler>,
648 custom_routes: Option<Router>,
649}
650
651impl ForgeBuilder {
652 pub fn new() -> Self {
654 Self {
655 config: None,
656 function_registry: FunctionRegistry::new(),
657 mcp_registry: McpToolRegistry::new(),
658 job_registry: JobRegistry::new(),
659 cron_registry: CronRegistry::new(),
660 workflow_registry: WorkflowRegistry::new(),
661 daemon_registry: DaemonRegistry::new(),
662 webhook_registry: WebhookRegistry::new(),
663 migrations_dir: PathBuf::from("migrations"),
664 extra_migrations: Vec::new(),
665 frontend_handler: None,
666 custom_routes: None,
667 }
668 }
669
670 pub fn migrations_dir(mut self, path: impl Into<PathBuf>) -> Self {
676 self.migrations_dir = path.into();
677 self
678 }
679
680 pub fn migration(mut self, name: impl Into<String>, sql: impl Into<String>) -> Self {
685 self.extra_migrations.push(Migration::new(name, sql));
686 self
687 }
688
689 pub fn frontend_handler(&mut self, handler: FrontendHandler) {
694 self.frontend_handler = Some(handler);
695 }
696
697 pub fn custom_routes(&mut self, router: Router) {
712 self.custom_routes = Some(router);
713 }
714
715 pub fn config(mut self, config: ForgeConfig) -> Self {
717 self.config = Some(config);
718 self
719 }
720
721 pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
723 &mut self.function_registry
724 }
725
726 pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
728 &mut self.job_registry
729 }
730
731 pub fn mcp_registry_mut(&mut self) -> &mut McpToolRegistry {
733 &mut self.mcp_registry
734 }
735
736 pub fn register_mcp_tool<T: ForgeMcpTool>(&mut self) -> &mut Self {
738 self.mcp_registry.register::<T>();
739 self
740 }
741
742 pub fn cron_registry_mut(&mut self) -> &mut CronRegistry {
744 &mut self.cron_registry
745 }
746
747 pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
749 &mut self.workflow_registry
750 }
751
752 pub fn daemon_registry_mut(&mut self) -> &mut DaemonRegistry {
754 &mut self.daemon_registry
755 }
756
757 pub fn webhook_registry_mut(&mut self) -> &mut WebhookRegistry {
759 &mut self.webhook_registry
760 }
761
762 pub fn register_query<Q: ForgeQuery>(&mut self) -> &mut Self
764 where
765 Q::Args: serde::de::DeserializeOwned + Send + 'static,
766 Q::Output: serde::Serialize + Send + 'static,
767 {
768 self.function_registry.register_query::<Q>();
769 self
770 }
771
772 pub fn register_mutation<M: ForgeMutation>(&mut self) -> &mut Self
774 where
775 M::Args: serde::de::DeserializeOwned + Send + 'static,
776 M::Output: serde::Serialize + Send + 'static,
777 {
778 self.function_registry.register_mutation::<M>();
779 self
780 }
781
782 pub fn register_job<J: forge_core::ForgeJob>(&mut self) -> &mut Self
784 where
785 J::Args: serde::de::DeserializeOwned + Send + 'static,
786 J::Output: serde::Serialize + Send + 'static,
787 {
788 self.job_registry.register::<J>();
789 self
790 }
791
792 pub fn register_cron<C: forge_core::ForgeCron>(&mut self) -> &mut Self {
794 self.cron_registry.register::<C>();
795 self
796 }
797
798 pub fn register_workflow<W: forge_core::ForgeWorkflow>(&mut self) -> &mut Self
800 where
801 W::Input: serde::de::DeserializeOwned,
802 W::Output: serde::Serialize,
803 {
804 self.workflow_registry.register::<W>();
805 self
806 }
807
808 pub fn register_daemon<D: forge_core::ForgeDaemon>(&mut self) -> &mut Self {
810 self.daemon_registry.register::<D>();
811 self
812 }
813
814 pub fn register_webhook<W: forge_core::ForgeWebhook>(&mut self) -> &mut Self {
816 self.webhook_registry.register::<W>();
817 self
818 }
819
820 pub fn build(self) -> Result<Forge> {
822 let config = self
823 .config
824 .ok_or_else(|| ForgeError::Config("Configuration is required".to_string()))?;
825
826 let (shutdown_tx, _) = broadcast::channel(1);
827
828 Ok(Forge {
829 config,
830 db: None,
831 node_id: NodeId::new(),
832 function_registry: self.function_registry,
833 mcp_registry: self.mcp_registry,
834 job_registry: self.job_registry,
835 cron_registry: Arc::new(self.cron_registry),
836 workflow_registry: self.workflow_registry,
837 daemon_registry: Arc::new(self.daemon_registry),
838 webhook_registry: Arc::new(self.webhook_registry),
839 shutdown_tx,
840 migrations_dir: self.migrations_dir,
841 extra_migrations: self.extra_migrations,
842 frontend_handler: self.frontend_handler,
843 custom_routes: self.custom_routes,
844 })
845 }
846}
847
848impl Default for ForgeBuilder {
849 fn default() -> Self {
850 Self::new()
851 }
852}
853
854fn config_role_to_node_role(role: &ConfigNodeRole) -> NodeRole {
856 match role {
857 ConfigNodeRole::Gateway => NodeRole::Gateway,
858 ConfigNodeRole::Function => NodeRole::Function,
859 ConfigNodeRole::Worker => NodeRole::Worker,
860 ConfigNodeRole::Scheduler => NodeRole::Scheduler,
861 }
862}
863
864#[cfg(test)]
865#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
866mod tests {
867 use super::*;
868 use std::future::Future;
869 use std::pin::Pin;
870
871 use forge_core::mcp::{McpToolAnnotations, McpToolInfo};
872
873 struct TestMcpTool;
874
875 impl ForgeMcpTool for TestMcpTool {
876 type Args = serde_json::Value;
877 type Output = serde_json::Value;
878
879 fn info() -> McpToolInfo {
880 McpToolInfo {
881 name: "test.mcp.tool",
882 title: None,
883 description: None,
884 required_role: None,
885 is_public: false,
886 timeout: None,
887 rate_limit_requests: None,
888 rate_limit_per_secs: None,
889 rate_limit_key: None,
890 annotations: McpToolAnnotations::default(),
891 icons: &[],
892 }
893 }
894
895 fn execute(
896 _ctx: &forge_core::McpToolContext,
897 _args: Self::Args,
898 ) -> Pin<Box<dyn Future<Output = forge_core::Result<Self::Output>> + Send + '_>> {
899 Box::pin(async { Ok(serde_json::json!({ "ok": true })) })
900 }
901 }
902
903 #[test]
904 fn test_forge_builder_new() {
905 let builder = ForgeBuilder::new();
906 assert!(builder.config.is_none());
907 }
908
909 #[test]
910 fn test_forge_builder_requires_config() {
911 let builder = ForgeBuilder::new();
912 let result = builder.build();
913 assert!(result.is_err());
914 }
915
916 #[test]
917 fn test_forge_builder_with_config() {
918 let config = ForgeConfig::default_with_database_url("postgres://localhost/test");
919 let result = ForgeBuilder::new().config(config).build();
920 assert!(result.is_ok());
921 }
922
923 #[test]
924 fn test_forge_builder_register_mcp_tool() {
925 let mut builder = ForgeBuilder::new();
926 builder.register_mcp_tool::<TestMcpTool>();
927 assert_eq!(builder.mcp_registry.len(), 1);
928 }
929
930 #[test]
931 fn test_config_role_conversion() {
932 assert_eq!(
933 config_role_to_node_role(&ConfigNodeRole::Gateway),
934 NodeRole::Gateway
935 );
936 assert_eq!(
937 config_role_to_node_role(&ConfigNodeRole::Worker),
938 NodeRole::Worker
939 );
940 assert_eq!(
941 config_role_to_node_role(&ConfigNodeRole::Scheduler),
942 NodeRole::Scheduler
943 );
944 assert_eq!(
945 config_role_to_node_role(&ConfigNodeRole::Function),
946 NodeRole::Function
947 );
948 }
949}