1use std::sync::Arc;
6
7use anyhow::{Context, Result};
8use folk_api::{Plugin, PluginContext, RpcRegistrar};
9use tokio::sync::watch;
10use tracing::{info, warn};
11
12use crate::config::FolkConfig;
13use crate::health_registry::HealthRegistryImpl;
14use crate::logging;
15use crate::metrics_registry::MetricsRegistryImpl;
16use crate::plugin_registry::PluginRegistry;
17use crate::runtime::Runtime;
18use crate::worker_pool::WorkerPool;
19
20pub struct FolkServer {
22 config: FolkConfig,
23 runtime: Arc<dyn Runtime>,
24 plugins: PluginRegistry,
25 rpc_registrar: Option<Arc<dyn RpcRegistrar>>,
26}
27
28impl FolkServer {
29 pub fn new(config: FolkConfig, runtime: Arc<dyn Runtime>) -> Self {
31 Self {
32 config,
33 runtime,
34 plugins: PluginRegistry::new(),
35 rpc_registrar: None,
36 }
37 }
38
39 pub fn set_rpc_registrar(&mut self, registrar: Arc<dyn RpcRegistrar>) {
42 self.rpc_registrar = Some(registrar);
43 }
44
45 pub fn register_plugin(&mut self, plugin: Box<dyn Plugin>) {
47 self.plugins.register(plugin);
48 }
49
50 pub async fn run(mut self) -> Result<()> {
52 let _ = logging::init(&self.config.log);
53
54 info!(
55 version = folk_api::FOLK_API_VERSION,
56 workers = self.config.workers.count,
57 "folk server starting"
58 );
59
60 let (shutdown_tx, shutdown_rx) = watch::channel(false);
61
62 let health_registry = HealthRegistryImpl::new();
63 let metrics_registry = MetricsRegistryImpl::new();
64
65 if self.config.workers.warmup {
66 match self.runtime.warmup().await {
67 Ok(()) => {},
68 Err(e) => warn!(error = %e, "opcache warmup failed, skipping"),
69 }
70 }
71
72 let pool = WorkerPool::new(self.runtime.clone(), self.config.workers.clone())
73 .context("failed to start worker pool")?;
74
75 info!("worker pool started");
76
77 let _watch_guard = if self.config.dev.watch {
80 if self.config.workers.count <= 1 {
81 warn!(
82 "hot reload enabled with workers.count <= 1: the main PHP thread is not \
83 recyclable, so code changes will not be picked up. Set workers.count > 1."
84 );
85 }
86 match crate::watch::spawn(&self.config.dev, pool.clone()) {
87 Ok(guard) => Some(guard),
88 Err(e) => {
89 warn!(error = %e, "failed to start hot reload watcher; continuing without it");
90 None
91 },
92 }
93 } else {
94 None
95 };
96
97 let ctx = PluginContext {
98 executor: pool.clone(),
99 shutdown: shutdown_rx.clone(),
100 rpc_registrar: self.rpc_registrar.clone(),
101 health_registry: Some(health_registry.clone()),
102 metrics_registry: Some(metrics_registry.clone()),
103 };
104
105 self.plugins
106 .boot_all(&ctx)
107 .await
108 .context("plugin boot failed")?;
109
110 info!(plugins = ?self.plugins.names(), "all plugins booted");
111
112 wait_for_signal().await;
114 info!("shutdown signal received; draining");
115
116 let _ = shutdown_tx.send(true);
117
118 let timeout = self.config.server.shutdown_timeout;
119 let shutdown_result =
120 tokio::time::timeout(timeout, async { self.plugins.shutdown_all().await }).await;
121
122 if shutdown_result.is_err() {
123 warn!(?timeout, "graceful shutdown timed out; forcing");
124 }
125
126 info!("folk server stopped");
127 Ok(())
128 }
129}
130
131#[cfg(not(target_os = "windows"))]
132async fn wait_for_signal() {
133 use tokio::signal::unix::{SignalKind, signal};
134 let mut sigterm = signal(SignalKind::terminate()).expect("SIGTERM handler");
135 let mut sigint = signal(SignalKind::interrupt()).expect("SIGINT handler");
136 tokio::select! {
137 _ = sigterm.recv() => { info!("received SIGTERM"); },
138 _ = sigint.recv() => { info!("received SIGINT"); },
139 }
140}
141
142#[cfg(target_os = "windows")]
143async fn wait_for_signal() {
144 tokio::signal::ctrl_c().await.expect("ctrl-c");
145}