Skip to main content

folk_core/
server.rs

1//! `FolkServer`: the lifecycle owner for the Folk application server.
2//!
3//! Typically constructed by the `folk` binary (phase 5) or by `folk-builder`-
4//! generated binaries. In tests, use `FolkServer::new(config, mock_runtime)`.
5
6use std::sync::Arc;
7
8use anyhow::{Context, Result};
9use folk_api::{Plugin, PluginContext};
10use tokio::sync::watch;
11use tracing::{info, warn};
12
13use crate::config::FolkConfig;
14use crate::health_registry::HealthRegistryImpl;
15use crate::logging;
16use crate::metrics_registry::MetricsRegistryImpl;
17use crate::plugin_registry::PluginRegistry;
18use crate::rpc_registry::RpcRegistry;
19use crate::rpc_server;
20use crate::runtime::Runtime;
21use crate::worker_pool::WorkerPool;
22
23/// The Folk application server.
24pub struct FolkServer {
25    config: FolkConfig,
26    runtime: Arc<dyn Runtime>,
27    plugins: PluginRegistry,
28}
29
30impl FolkServer {
31    /// Construct a server with the given config and runtime.
32    ///
33    /// In production, the runtime is `folk_runtime_pipe::PipeRuntime`
34    /// (phase 4). For tests, pass `MockRuntime::echo()`.
35    pub fn new(config: FolkConfig, runtime: Arc<dyn Runtime>) -> Self {
36        Self {
37            config,
38            runtime,
39            plugins: PluginRegistry::new(),
40        }
41    }
42
43    /// Register a plugin. Call before `run`.
44    pub fn register_plugin(&mut self, plugin: Box<dyn Plugin>) {
45        self.plugins.register(plugin);
46    }
47
48    /// Run the server until SIGTERM (or SIGINT on dev builds).
49    ///
50    /// This method:
51    /// 1. Initializes logging.
52    /// 2. Spawns the worker pool.
53    /// 3. Boots all registered plugins.
54    /// 4. Starts the admin RPC server.
55    /// 5. Waits for SIGTERM.
56    /// 6. Gracefully shuts down: RPC server, plugins (in reverse), pool.
57    /// 7. Returns `Ok(())` on clean exit.
58    pub async fn run(mut self) -> Result<()> {
59        // 1. Logging.
60        let _ = logging::init(&self.config.log); // ignore reinit errors in tests
61
62        info!(
63            version = folk_api::FOLK_API_VERSION,
64            workers = self.config.workers.count,
65            runtime = ?self.config.server.runtime,
66            "folk server starting"
67        );
68
69        // 2. Shutdown broadcast.
70        let (shutdown_tx, shutdown_rx) = watch::channel(false);
71
72        // 3. Registries.
73        let rpc_registry = RpcRegistry::new();
74        let health_registry = HealthRegistryImpl::new();
75        let metrics_registry = MetricsRegistryImpl::new();
76
77        // 4. Worker pool.
78        let pool = WorkerPool::new(self.runtime.clone(), self.config.workers.clone())
79            .context("failed to start worker pool")?;
80
81        info!("worker pool started");
82
83        // 5. Plugin context.
84        let ctx = PluginContext {
85            executor: pool.clone(),
86            shutdown: shutdown_rx.clone(),
87            rpc_registrar: Some(rpc_registry.clone()),
88            health_registry: Some(health_registry.clone()),
89            metrics_registry: Some(metrics_registry.clone()),
90        };
91
92        // 6. Boot plugins.
93        self.plugins
94            .boot_all(&ctx)
95            .await
96            .context("plugin boot failed")?;
97
98        info!(plugins = ?self.plugins.names(), "all plugins booted");
99
100        // 7. Start admin RPC server.
101        let rpc_path = self.config.server.rpc_socket.clone();
102        let rpc_reg = rpc_registry.clone();
103        let rpc_sd = shutdown_rx.clone();
104        let rpc_task = tokio::spawn(async move {
105            if let Err(e) = rpc_server::run_rpc_server(&rpc_path, rpc_reg, rpc_sd).await {
106                warn!(error = ?e, "admin RPC server error");
107            }
108        });
109
110        // 8. Wait for shutdown signal.
111        wait_for_signal().await;
112        info!("shutdown signal received; draining");
113
114        // 9. Signal all components.
115        let _ = shutdown_tx.send(true);
116
117        // 10. Shutdown timeout.
118        let timeout = self.config.server.shutdown_timeout;
119        let shutdown_result =
120            tokio::time::timeout(timeout, async { self.plugins.shutdown_all().await }).await;
121
122        if shutdown_result.is_err() {
123            warn!(?timeout, "graceful shutdown timed out; forcing");
124        }
125
126        rpc_task.abort();
127        info!("folk server stopped");
128        Ok(())
129    }
130}
131
132#[cfg(not(target_os = "windows"))]
133async fn wait_for_signal() {
134    use tokio::signal::unix::{SignalKind, signal};
135    let mut sigterm = signal(SignalKind::terminate()).expect("SIGTERM handler");
136    let mut sigint = signal(SignalKind::interrupt()).expect("SIGINT handler");
137    tokio::select! {
138        _ = sigterm.recv() => { info!("received SIGTERM"); },
139        _ = sigint.recv() => { info!("received SIGINT"); },
140    }
141}
142
143#[cfg(target_os = "windows")]
144async fn wait_for_signal() {
145    tokio::signal::ctrl_c().await.expect("ctrl-c");
146}