1use std::future::Future;
12use std::net::IpAddr;
13use std::path::PathBuf;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::Duration;
17
18use axum::Router;
19use axum::body::Body;
20use axum::http::Request;
21use axum::response::Response;
22use tokio::sync::broadcast;
23
24use forge_core::CircuitBreakerClient;
25use forge_core::cluster::{LeaderRole, NodeId, NodeInfo, NodeRole, NodeStatus};
26use forge_core::config::{ForgeConfig, NodeRole as ConfigNodeRole};
27use forge_core::error::{ForgeError, Result};
28use forge_core::mcp::ForgeMcpTool;
29use forge_runtime::migrations::{Migration, MigrationRunner, load_migrations_from_dir};
30
31use forge_runtime::cluster::{
32 GracefulShutdown, HeartbeatConfig, HeartbeatLoop, LeaderConfig, LeaderElection, NodeRegistry,
33 ShutdownConfig,
34};
35use forge_runtime::cron::{CronRegistry, CronRunner, CronRunnerConfig};
36use forge_runtime::daemon::{DaemonRegistry, DaemonRunner};
37use forge_runtime::db::Database;
38use forge_runtime::function::FunctionRegistry;
39use forge_runtime::gateway::{AuthConfig, GatewayConfig as RuntimeGatewayConfig, GatewayServer};
40use forge_runtime::jobs::{JobDispatcher, JobQueue, JobRegistry, Worker, WorkerConfig};
41use forge_runtime::mcp::McpToolRegistry;
42use forge_runtime::webhook::{WebhookRegistry, WebhookState, webhook_handler};
43use forge_runtime::workflow::{
44 EventStore, WorkflowExecutor, WorkflowRegistry, WorkflowScheduler, WorkflowSchedulerConfig,
45};
46use tokio_util::sync::CancellationToken;
47
48pub type FrontendHandler = fn(Request<Body>) -> Pin<Box<dyn Future<Output = Response> + Send>>;
50
51pub mod prelude {
53 pub use chrono::{DateTime, Utc};
55 pub use uuid::Uuid;
56
57 pub use serde::{Deserialize, Serialize};
59 pub use serde_json;
60
61 pub type Timestamp = DateTime<Utc>;
63
64 pub use forge_core::cluster::NodeRole;
66 pub use forge_core::config::ForgeConfig;
67 pub use forge_core::cron::{CronContext, ForgeCron};
68 pub use forge_core::daemon::{DaemonContext, ForgeDaemon};
69 pub use forge_core::env::EnvAccess;
70 pub use forge_core::error::{ForgeError, Result};
71 pub use forge_core::function::{
72 AuthContext, ForgeMutation, ForgeQuery, MutationContext, QueryContext,
73 };
74 pub use forge_core::job::{ForgeJob, JobContext, JobPriority};
75 pub use forge_core::mcp::{ForgeMcpTool, McpToolContext, McpToolResult};
76 pub use forge_core::realtime::Delta;
77 pub use forge_core::schema::{FieldDef, ModelMeta, SchemaRegistry, TableDef};
78 pub use forge_core::schemars::JsonSchema;
79 pub use forge_core::types::Upload;
80 pub use forge_core::webhook::{ForgeWebhook, WebhookContext, WebhookResult, WebhookSignature};
81 pub use forge_core::workflow::{ForgeWorkflow, WorkflowContext};
82
83 pub use axum;
85
86 pub use crate::{Forge, ForgeBuilder};
87}
88
89pub struct Forge {
91 config: ForgeConfig,
92 db: Option<Database>,
93 node_id: NodeId,
94 function_registry: FunctionRegistry,
95 mcp_registry: McpToolRegistry,
96 job_registry: JobRegistry,
97 cron_registry: Arc<CronRegistry>,
98 workflow_registry: WorkflowRegistry,
99 daemon_registry: Arc<DaemonRegistry>,
100 webhook_registry: Arc<WebhookRegistry>,
101 shutdown_tx: broadcast::Sender<()>,
102 migrations_dir: PathBuf,
104 extra_migrations: Vec<Migration>,
106 frontend_handler: Option<FrontendHandler>,
108 custom_routes: Option<Router>,
110}
111
112impl Forge {
113 pub fn builder() -> ForgeBuilder {
115 ForgeBuilder::new()
116 }
117
118 pub fn node_id(&self) -> NodeId {
120 self.node_id
121 }
122
123 pub fn config(&self) -> &ForgeConfig {
125 &self.config
126 }
127
128 pub fn function_registry(&self) -> &FunctionRegistry {
130 &self.function_registry
131 }
132
133 pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
135 &mut self.function_registry
136 }
137
138 pub fn mcp_registry_mut(&mut self) -> &mut McpToolRegistry {
140 &mut self.mcp_registry
141 }
142
143 pub fn register_mcp_tool<T: ForgeMcpTool>(&mut self) -> &mut Self {
145 self.mcp_registry.register::<T>();
146 self
147 }
148
149 pub fn job_registry(&self) -> &JobRegistry {
151 &self.job_registry
152 }
153
154 pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
156 &mut self.job_registry
157 }
158
159 pub fn cron_registry(&self) -> Arc<CronRegistry> {
161 self.cron_registry.clone()
162 }
163
164 pub fn workflow_registry(&self) -> &WorkflowRegistry {
166 &self.workflow_registry
167 }
168
169 pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
171 &mut self.workflow_registry
172 }
173
174 pub fn daemon_registry(&self) -> Arc<DaemonRegistry> {
176 self.daemon_registry.clone()
177 }
178
179 pub fn webhook_registry(&self) -> Arc<WebhookRegistry> {
181 self.webhook_registry.clone()
182 }
183
184 pub async fn run(mut self) -> Result<()> {
186 let telemetry_config = forge_runtime::TelemetryConfig::from_observability_config(
188 &self.config.observability,
189 &self.config.project.name,
190 &self.config.project.version,
191 );
192 match forge_runtime::init_telemetry(
193 &telemetry_config,
194 &self.config.project.name,
195 &self.config.observability.log_level,
196 ) {
197 Ok(true) => {}
198 Ok(false) => {
199 }
201 Err(e) => {
202 eprintln!("forge: failed to initialize telemetry: {e}");
203 }
204 }
205
206 tracing::debug!("Connecting to database");
207
208 let db = Database::from_config(&self.config.database).await?;
210 let pool = db.primary().clone();
211 self.db = Some(db);
212
213 tracing::debug!("Database connected");
214
215 let runner = MigrationRunner::new(pool.clone());
218
219 let mut user_migrations = load_migrations_from_dir(&self.migrations_dir)?;
221 user_migrations.extend(self.extra_migrations.clone());
222
223 runner.run(user_migrations).await?;
224 tracing::debug!("Migrations applied");
225
226 let hostname = hostname::get()
228 .map(|h| h.to_string_lossy().to_string())
229 .unwrap_or_else(|_| "unknown".to_string());
230
231 let ip_address: IpAddr = "127.0.0.1".parse().expect("valid IP literal");
232 let roles: Vec<NodeRole> = self
233 .config
234 .node
235 .roles
236 .iter()
237 .map(config_role_to_node_role)
238 .collect();
239
240 let node_info = NodeInfo::new_local(
241 hostname,
242 ip_address,
243 self.config.gateway.port,
244 self.config.gateway.grpc_port,
245 roles.clone(),
246 self.config.node.worker_capabilities.clone(),
247 env!("CARGO_PKG_VERSION").to_string(),
248 );
249
250 let node_id = node_info.id;
251 self.node_id = node_id;
252
253 let node_registry = Arc::new(NodeRegistry::new(pool.clone(), node_info));
255
256 if let Err(e) = node_registry.register().await {
258 tracing::debug!("Failed to register node (tables may not exist): {}", e);
259 }
260
261 if let Err(e) = node_registry.set_status(NodeStatus::Active).await {
263 tracing::debug!("Failed to set node status: {}", e);
264 }
265
266 let leader_election = if roles.contains(&NodeRole::Scheduler) {
268 let election = Arc::new(LeaderElection::new(
269 pool.clone(),
270 node_id,
271 LeaderRole::Scheduler,
272 LeaderConfig::default(),
273 ));
274
275 if let Err(e) = election.try_become_leader().await {
277 tracing::debug!("Failed to acquire leadership: {}", e);
278 }
279
280 Some(election)
281 } else {
282 None
283 };
284
285 let shutdown = Arc::new(GracefulShutdown::new(
287 node_registry.clone(),
288 leader_election.clone(),
289 ShutdownConfig::default(),
290 ));
291
292 let http_client = CircuitBreakerClient::with_defaults(reqwest::Client::new());
294
295 let mut handles = Vec::new();
297
298 {
300 let heartbeat_pool = pool.clone();
301 let heartbeat_node_id = node_id;
302 let config = HeartbeatConfig::default();
303 handles.push(tokio::spawn(async move {
304 let heartbeat = HeartbeatLoop::new(heartbeat_pool, heartbeat_node_id, config);
305 heartbeat.run().await;
306 }));
307 }
308
309 if let Some(ref election) = leader_election {
311 let election = election.clone();
312 handles.push(tokio::spawn(async move {
313 election.run().await;
314 }));
315 }
316
317 if roles.contains(&NodeRole::Worker) {
319 let job_queue = JobQueue::new(pool.clone());
320 let worker_config = WorkerConfig {
321 id: Some(node_id.as_uuid()),
322 capabilities: self.config.node.worker_capabilities.clone(),
323 max_concurrent: self.config.worker.max_concurrent_jobs,
324 poll_interval: Duration::from_millis(self.config.worker.poll_interval_ms),
325 ..Default::default()
326 };
327
328 let mut worker = Worker::new(
329 worker_config,
330 job_queue,
331 self.job_registry.clone(),
332 pool.clone(),
333 );
334
335 handles.push(tokio::spawn(async move {
336 if let Err(e) = worker.run().await {
337 tracing::error!("Worker error: {}", e);
338 }
339 }));
340
341 tracing::debug!("Job worker started");
342 }
343
344 if roles.contains(&NodeRole::Scheduler) {
346 let cron_registry = self.cron_registry.clone();
347 let cron_pool = pool.clone();
348 let cron_http = http_client.clone();
349 let cron_leader_election = leader_election.clone();
350
351 let cron_config = CronRunnerConfig {
352 poll_interval: Duration::from_secs(1),
353 node_id: node_id.as_uuid(),
354 is_leader: cron_leader_election.is_none(),
355 leader_election: cron_leader_election,
356 run_stale_threshold: Duration::from_secs(15 * 60),
357 };
358
359 let cron_runner = CronRunner::new(cron_registry, cron_pool, cron_http, cron_config);
360
361 handles.push(tokio::spawn(async move {
362 if let Err(e) = cron_runner.run().await {
363 tracing::error!("Cron runner error: {}", e);
364 }
365 }));
366
367 tracing::debug!("Cron scheduler started");
368 }
369
370 let workflow_shutdown_token = CancellationToken::new();
372 if roles.contains(&NodeRole::Scheduler) {
373 let scheduler_executor = Arc::new(WorkflowExecutor::new(
374 Arc::new(self.workflow_registry.clone()),
375 pool.clone(),
376 http_client.clone(),
377 ));
378 let event_store = Arc::new(EventStore::new(pool.clone()));
379 let scheduler = WorkflowScheduler::new(
380 pool.clone(),
381 scheduler_executor,
382 event_store,
383 WorkflowSchedulerConfig::default(),
384 );
385
386 let shutdown_token = workflow_shutdown_token.clone();
387 handles.push(tokio::spawn(async move {
388 scheduler.run(shutdown_token).await;
389 }));
390
391 tracing::debug!("Workflow scheduler started");
392 }
393
394 if roles.contains(&NodeRole::Scheduler) && !self.daemon_registry.is_empty() {
396 let daemon_registry = self.daemon_registry.clone();
397 let daemon_pool = pool.clone();
398 let daemon_http = http_client.clone();
399 let daemon_shutdown_rx = self.shutdown_tx.subscribe();
400
401 let daemon_runner = DaemonRunner::new(
402 daemon_registry,
403 daemon_pool,
404 daemon_http,
405 node_id.as_uuid(),
406 daemon_shutdown_rx,
407 );
408
409 handles.push(tokio::spawn(async move {
410 if let Err(e) = daemon_runner.run().await {
411 tracing::error!("Daemon runner error: {}", e);
412 }
413 }));
414
415 tracing::debug!("Daemon runner started");
416 }
417
418 let mut reactor_handle = None;
420
421 let job_queue = JobQueue::new(pool.clone());
423 let job_dispatcher = Arc::new(JobDispatcher::new(job_queue, self.job_registry.clone()));
424 let workflow_executor = Arc::new(WorkflowExecutor::new(
425 Arc::new(self.workflow_registry.clone()),
426 pool.clone(),
427 http_client.clone(),
428 ));
429
430 let gateway_port = std::env::var("PORT")
432 .ok()
433 .and_then(|p| p.parse::<u16>().ok())
434 .unwrap_or(self.config.gateway.port);
435
436 if roles.contains(&NodeRole::Gateway) {
438 let gateway_config = RuntimeGatewayConfig {
439 port: gateway_port,
440 max_connections: self.config.gateway.max_connections,
441 request_timeout_secs: self.config.gateway.request_timeout_secs,
442 cors_enabled: self.config.gateway.cors_enabled
443 || !self.config.gateway.cors_origins.is_empty(),
444 cors_origins: self.config.gateway.cors_origins.clone(),
445 auth: AuthConfig::from_forge_config(&self.config.auth)
446 .map_err(|e| ForgeError::Config(e.to_string()))?,
447 mcp: self.config.mcp.clone(),
448 };
449
450 let gateway = GatewayServer::new(
452 gateway_config,
453 self.function_registry.clone(),
454 self.db.clone().expect("Database must be initialized"),
455 )
456 .with_job_dispatcher(job_dispatcher.clone())
457 .with_workflow_dispatcher(workflow_executor.clone())
458 .with_mcp_registry(self.mcp_registry.clone());
459
460 let reactor = gateway.reactor();
462 if let Err(e) = reactor.start().await {
463 tracing::error!("Failed to start reactor: {}", e);
464 } else {
465 tracing::debug!("Reactor started");
466 reactor_handle = Some(reactor);
467 }
468
469 let api_router = gateway.router();
471
472 let mut router = Router::new().nest("/_api", api_router);
474
475 if !self.webhook_registry.is_empty() {
477 use axum::routing::post;
478 use tower_http::cors::{Any, CorsLayer};
479
480 let webhook_state = Arc::new(
481 WebhookState::new(self.webhook_registry.clone(), pool.clone())
482 .with_job_dispatcher(job_dispatcher.clone()),
483 );
484
485 let webhook_cors = if self.config.gateway.cors_enabled
488 || !self.config.gateway.cors_origins.is_empty()
489 {
490 if self.config.gateway.cors_origins.iter().any(|o| o == "*") {
491 CorsLayer::new()
492 .allow_origin(Any)
493 .allow_methods(Any)
494 .allow_headers(Any)
495 } else {
496 let origins: Vec<_> = self
497 .config
498 .gateway
499 .cors_origins
500 .iter()
501 .filter_map(|o| o.parse().ok())
502 .collect();
503 CorsLayer::new()
504 .allow_origin(origins)
505 .allow_methods(Any)
506 .allow_headers(Any)
507 }
508 } else {
509 CorsLayer::new()
510 };
511
512 let webhook_router = Router::new()
513 .route("/{*path}", post(webhook_handler).with_state(webhook_state))
514 .layer(axum::extract::DefaultBodyLimit::max(1024 * 1024))
515 .layer(
516 tower::ServiceBuilder::new()
517 .layer(axum::error_handling::HandleErrorLayer::new(
518 |err: tower::BoxError| async move {
519 if err.is::<tower::timeout::error::Elapsed>() {
520 return (
521 axum::http::StatusCode::REQUEST_TIMEOUT,
522 "Request timed out",
523 );
524 }
525 (
526 axum::http::StatusCode::SERVICE_UNAVAILABLE,
527 "Server overloaded",
528 )
529 },
530 ))
531 .layer(tower::limit::ConcurrencyLimitLayer::new(
532 self.config.gateway.max_connections,
533 ))
534 .layer(tower::timeout::TimeoutLayer::new(Duration::from_secs(
535 self.config.gateway.request_timeout_secs,
536 ))),
537 )
538 .layer(webhook_cors);
539
540 router = router.nest("/_api/webhooks", webhook_router);
541
542 tracing::debug!(
543 webhooks = ?self.webhook_registry.paths().collect::<Vec<_>>(),
544 "Webhook routes registered"
545 );
546 }
547
548 if let Some(custom) = self.custom_routes.take() {
550 router = router.merge(custom);
551 tracing::debug!("Custom routes merged");
552 }
553
554 if let Some(handler) = self.frontend_handler {
556 use axum::routing::get;
557 router = router.fallback(get(handler));
558 tracing::debug!("Frontend handler enabled");
559 }
560
561 let addr = gateway.addr();
562
563 handles.push(tokio::spawn(async move {
564 tracing::debug!(addr = %addr, "Gateway server binding");
565 let listener = tokio::net::TcpListener::bind(addr)
566 .await
567 .expect("Failed to bind");
568 if let Err(e) = axum::serve(listener, router).await {
569 tracing::error!("Gateway server error: {}", e);
570 }
571 }));
572 }
573
574 tracing::info!(
575 node_id = %node_id,
576 roles = ?roles,
577 port = gateway_port,
578 "Forge started"
579 );
580
581 let mut shutdown_rx = self.shutdown_tx.subscribe();
583
584 tokio::select! {
585 _ = tokio::signal::ctrl_c() => {
586 tracing::debug!("Received ctrl-c");
587 }
588 _ = shutdown_rx.recv() => {
589 tracing::debug!("Received shutdown notification");
590 }
591 }
592
593 tracing::debug!("Graceful shutdown starting");
595
596 workflow_shutdown_token.cancel();
598
599 if let Err(e) = shutdown.shutdown().await {
600 tracing::warn!(error = %e, "Shutdown error");
601 }
602
603 if let Some(ref election) = leader_election {
605 election.stop();
606 }
607
608 if let Some(ref reactor) = reactor_handle {
610 reactor.stop();
611 }
612
613 if let Some(ref db) = self.db {
615 db.close().await;
616 }
617
618 forge_runtime::shutdown_telemetry();
619 tracing::info!("Forge stopped");
620 Ok(())
621 }
622
623 pub fn shutdown(&self) {
625 let _ = self.shutdown_tx.send(());
626 }
627}
628
629pub struct ForgeBuilder {
631 config: Option<ForgeConfig>,
632 function_registry: FunctionRegistry,
633 mcp_registry: McpToolRegistry,
634 job_registry: JobRegistry,
635 cron_registry: CronRegistry,
636 workflow_registry: WorkflowRegistry,
637 daemon_registry: DaemonRegistry,
638 webhook_registry: WebhookRegistry,
639 migrations_dir: PathBuf,
640 extra_migrations: Vec<Migration>,
641 frontend_handler: Option<FrontendHandler>,
642 custom_routes: Option<Router>,
643}
644
645impl ForgeBuilder {
646 pub fn new() -> Self {
648 Self {
649 config: None,
650 function_registry: FunctionRegistry::new(),
651 mcp_registry: McpToolRegistry::new(),
652 job_registry: JobRegistry::new(),
653 cron_registry: CronRegistry::new(),
654 workflow_registry: WorkflowRegistry::new(),
655 daemon_registry: DaemonRegistry::new(),
656 webhook_registry: WebhookRegistry::new(),
657 migrations_dir: PathBuf::from("migrations"),
658 extra_migrations: Vec::new(),
659 frontend_handler: None,
660 custom_routes: None,
661 }
662 }
663
664 pub fn migrations_dir(mut self, path: impl Into<PathBuf>) -> Self {
670 self.migrations_dir = path.into();
671 self
672 }
673
674 pub fn migration(mut self, name: impl Into<String>, sql: impl Into<String>) -> Self {
679 self.extra_migrations.push(Migration::new(name, sql));
680 self
681 }
682
683 pub fn frontend_handler(&mut self, handler: FrontendHandler) {
688 self.frontend_handler = Some(handler);
689 }
690
691 pub fn custom_routes(&mut self, router: Router) {
706 self.custom_routes = Some(router);
707 }
708
709 pub fn config(mut self, config: ForgeConfig) -> Self {
711 self.config = Some(config);
712 self
713 }
714
715 pub fn function_registry_mut(&mut self) -> &mut FunctionRegistry {
717 &mut self.function_registry
718 }
719
720 pub fn job_registry_mut(&mut self) -> &mut JobRegistry {
722 &mut self.job_registry
723 }
724
725 pub fn mcp_registry_mut(&mut self) -> &mut McpToolRegistry {
727 &mut self.mcp_registry
728 }
729
730 pub fn register_mcp_tool<T: ForgeMcpTool>(&mut self) -> &mut Self {
732 self.mcp_registry.register::<T>();
733 self
734 }
735
736 pub fn cron_registry_mut(&mut self) -> &mut CronRegistry {
738 &mut self.cron_registry
739 }
740
741 pub fn workflow_registry_mut(&mut self) -> &mut WorkflowRegistry {
743 &mut self.workflow_registry
744 }
745
746 pub fn daemon_registry_mut(&mut self) -> &mut DaemonRegistry {
748 &mut self.daemon_registry
749 }
750
751 pub fn webhook_registry_mut(&mut self) -> &mut WebhookRegistry {
753 &mut self.webhook_registry
754 }
755
756 pub fn build(self) -> Result<Forge> {
758 let config = self
759 .config
760 .ok_or_else(|| ForgeError::Config("Configuration is required".to_string()))?;
761
762 let (shutdown_tx, _) = broadcast::channel(1);
763
764 Ok(Forge {
765 config,
766 db: None,
767 node_id: NodeId::new(),
768 function_registry: self.function_registry,
769 mcp_registry: self.mcp_registry,
770 job_registry: self.job_registry,
771 cron_registry: Arc::new(self.cron_registry),
772 workflow_registry: self.workflow_registry,
773 daemon_registry: Arc::new(self.daemon_registry),
774 webhook_registry: Arc::new(self.webhook_registry),
775 shutdown_tx,
776 migrations_dir: self.migrations_dir,
777 extra_migrations: self.extra_migrations,
778 frontend_handler: self.frontend_handler,
779 custom_routes: self.custom_routes,
780 })
781 }
782}
783
784impl Default for ForgeBuilder {
785 fn default() -> Self {
786 Self::new()
787 }
788}
789
790fn config_role_to_node_role(role: &ConfigNodeRole) -> NodeRole {
792 match role {
793 ConfigNodeRole::Gateway => NodeRole::Gateway,
794 ConfigNodeRole::Function => NodeRole::Function,
795 ConfigNodeRole::Worker => NodeRole::Worker,
796 ConfigNodeRole::Scheduler => NodeRole::Scheduler,
797 }
798}
799
800#[cfg(test)]
801#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
802mod tests {
803 use super::*;
804 use std::future::Future;
805 use std::pin::Pin;
806
807 use forge_core::mcp::{McpToolAnnotations, McpToolInfo};
808
809 struct TestMcpTool;
810
811 impl ForgeMcpTool for TestMcpTool {
812 type Args = serde_json::Value;
813 type Output = serde_json::Value;
814
815 fn info() -> McpToolInfo {
816 McpToolInfo {
817 name: "test.mcp.tool",
818 title: None,
819 description: None,
820 required_role: None,
821 is_public: false,
822 timeout: None,
823 rate_limit_requests: None,
824 rate_limit_per_secs: None,
825 rate_limit_key: None,
826 annotations: McpToolAnnotations::default(),
827 icons: &[],
828 }
829 }
830
831 fn execute(
832 _ctx: &forge_core::McpToolContext,
833 _args: Self::Args,
834 ) -> Pin<Box<dyn Future<Output = forge_core::Result<Self::Output>> + Send + '_>> {
835 Box::pin(async { Ok(serde_json::json!({ "ok": true })) })
836 }
837 }
838
839 #[test]
840 fn test_forge_builder_new() {
841 let builder = ForgeBuilder::new();
842 assert!(builder.config.is_none());
843 }
844
845 #[test]
846 fn test_forge_builder_requires_config() {
847 let builder = ForgeBuilder::new();
848 let result = builder.build();
849 assert!(result.is_err());
850 }
851
852 #[test]
853 fn test_forge_builder_with_config() {
854 let config = ForgeConfig::default_with_database_url("postgres://localhost/test");
855 let result = ForgeBuilder::new().config(config).build();
856 assert!(result.is_ok());
857 }
858
859 #[test]
860 fn test_forge_builder_register_mcp_tool() {
861 let mut builder = ForgeBuilder::new();
862 builder.register_mcp_tool::<TestMcpTool>();
863 assert_eq!(builder.mcp_registry.len(), 1);
864 }
865
866 #[test]
867 fn test_config_role_conversion() {
868 assert_eq!(
869 config_role_to_node_role(&ConfigNodeRole::Gateway),
870 NodeRole::Gateway
871 );
872 assert_eq!(
873 config_role_to_node_role(&ConfigNodeRole::Worker),
874 NodeRole::Worker
875 );
876 assert_eq!(
877 config_role_to_node_role(&ConfigNodeRole::Scheduler),
878 NodeRole::Scheduler
879 );
880 assert_eq!(
881 config_role_to_node_role(&ConfigNodeRole::Function),
882 NodeRole::Function
883 );
884 }
885}