Skip to main content

folk_core/
server.rs

1//! `FolkServer`: the lifecycle owner for the Folk application server.
2//!
3//! Constructed by `folk-ext` (PHP extension) or in tests with `MockRuntime`.
4
5use std::sync::Arc;
6
7use anyhow::{Context, Result};
8use folk_api::{Plugin, PluginContext, RpcRegistrar};
9use tokio::sync::watch;
10use tracing::{info, warn};
11
12use crate::config::FolkConfig;
13use crate::health_registry::HealthRegistryImpl;
14use crate::logging;
15use crate::metrics_registry::MetricsRegistryImpl;
16use crate::plugin_registry::PluginRegistry;
17use crate::runtime::Runtime;
18use crate::worker_pool::WorkerPool;
19
20/// The Folk application server.
21pub struct FolkServer {
22    config: FolkConfig,
23    runtime: Arc<dyn Runtime>,
24    plugins: PluginRegistry,
25    rpc_registrar: Option<Arc<dyn RpcRegistrar>>,
26}
27
28impl FolkServer {
29    /// Construct a server with the given config and runtime.
30    pub fn new(config: FolkConfig, runtime: Arc<dyn Runtime>) -> Self {
31        Self {
32            config,
33            runtime,
34            plugins: PluginRegistry::new(),
35            rpc_registrar: None,
36        }
37    }
38
39    /// Set an in-process RPC registrar for plugin method registration.
40    /// Plugins will register their handlers here during boot.
41    pub fn set_rpc_registrar(&mut self, registrar: Arc<dyn RpcRegistrar>) {
42        self.rpc_registrar = Some(registrar);
43    }
44
45    /// Register a plugin. Call before `run`.
46    pub fn register_plugin(&mut self, plugin: Box<dyn Plugin>) {
47        self.plugins.register(plugin);
48    }
49
50    /// Run the server until SIGTERM/SIGINT.
51    pub async fn run(mut self) -> Result<()> {
52        let _ = logging::init(&self.config.log);
53
54        self.config.workers.normalize();
55
56        info!(
57            version = folk_api::FOLK_API_VERSION,
58            workers = self.config.workers.count,
59            "folk server starting"
60        );
61
62        let (shutdown_tx, shutdown_rx) = watch::channel(false);
63
64        let health_registry = HealthRegistryImpl::new();
65        let metrics_registry = MetricsRegistryImpl::new();
66
67        if self.config.workers.warmup {
68            match self.runtime.warmup().await {
69                Ok(()) => {},
70                Err(e) => warn!(error = %e, "opcache warmup failed, skipping"),
71            }
72        }
73
74        let pool = WorkerPool::new(self.runtime.clone(), self.config.workers.clone())
75            .context("failed to start worker pool")?;
76
77        info!("worker pool started");
78
79        // Dev-mode hot reload: watch PHP files and recycle workers on change.
80        // Keep the guard alive for the lifetime of the server.
81        let _watch_guard = if self.config.dev.watch {
82            if self.config.workers.count <= 1 {
83                warn!(
84                    "hot reload enabled with workers.count <= 1: the main PHP thread is not \
85                     recyclable, so code changes will not be picked up. Set workers.count > 1."
86                );
87            }
88            match crate::watch::spawn(&self.config.dev, pool.clone()) {
89                Ok(guard) => Some(guard),
90                Err(e) => {
91                    warn!(error = %e, "failed to start hot reload watcher; continuing without it");
92                    None
93                },
94            }
95        } else {
96            None
97        };
98
99        let ctx = PluginContext {
100            executor: pool.clone(),
101            shutdown: shutdown_rx.clone(),
102            rpc_registrar: self.rpc_registrar.clone(),
103            health_registry: Some(health_registry.clone()),
104            metrics_registry: Some(metrics_registry.clone()),
105        };
106
107        self.plugins
108            .boot_all(&ctx)
109            .await
110            .context("plugin boot failed")?;
111
112        info!(plugins = ?self.plugins.names(), "all plugins booted");
113
114        // Wait for shutdown signal.
115        wait_for_signal().await;
116        info!("shutdown signal received; draining");
117
118        let _ = shutdown_tx.send(true);
119
120        let timeout = self.config.server.shutdown_timeout;
121        let shutdown_result =
122            tokio::time::timeout(timeout, async { self.plugins.shutdown_all().await }).await;
123
124        if shutdown_result.is_err() {
125            warn!(?timeout, "graceful shutdown timed out; forcing");
126        }
127
128        info!("folk server stopped");
129        Ok(())
130    }
131}
132
133#[cfg(not(target_os = "windows"))]
134async fn wait_for_signal() {
135    use tokio::signal::unix::{SignalKind, signal};
136    let mut sigterm = signal(SignalKind::terminate()).expect("SIGTERM handler");
137    let mut sigint = signal(SignalKind::interrupt()).expect("SIGINT handler");
138    tokio::select! {
139        _ = sigterm.recv() => { info!("received SIGTERM"); },
140        _ = sigint.recv() => { info!("received SIGINT"); },
141    }
142}
143
144#[cfg(target_os = "windows")]
145async fn wait_for_signal() {
146    tokio::signal::ctrl_c().await.expect("ctrl-c");
147}