1use std::sync::Arc;
7
8use anyhow::{Context, Result};
9use folk_api::{Plugin, PluginContext};
10use tokio::sync::watch;
11use tracing::{info, warn};
12
13use crate::config::FolkConfig;
14use crate::health_registry::HealthRegistryImpl;
15use crate::logging;
16use crate::metrics_registry::MetricsRegistryImpl;
17use crate::plugin_registry::PluginRegistry;
18use crate::rpc_registry::RpcRegistry;
19use crate::rpc_server;
20use crate::runtime::Runtime;
21use crate::worker_pool::WorkerPool;
22
23pub struct FolkServer {
25 config: FolkConfig,
26 runtime: Arc<dyn Runtime>,
27 plugins: PluginRegistry,
28}
29
30impl FolkServer {
31 pub fn new(config: FolkConfig, runtime: Arc<dyn Runtime>) -> Self {
36 Self {
37 config,
38 runtime,
39 plugins: PluginRegistry::new(),
40 }
41 }
42
43 pub fn register_plugin(&mut self, plugin: Box<dyn Plugin>) {
45 self.plugins.register(plugin);
46 }
47
48 pub async fn run(mut self) -> Result<()> {
59 let _ = logging::init(&self.config.log); info!(
63 version = folk_api::FOLK_API_VERSION,
64 workers = self.config.workers.count,
65 runtime = ?self.config.server.runtime,
66 "folk server starting"
67 );
68
69 let (shutdown_tx, shutdown_rx) = watch::channel(false);
71
72 let rpc_registry = RpcRegistry::new();
74 let health_registry = HealthRegistryImpl::new();
75 let metrics_registry = MetricsRegistryImpl::new();
76
77 let pool = WorkerPool::new(self.runtime.clone(), self.config.workers.clone())
79 .context("failed to start worker pool")?;
80
81 info!("worker pool started");
82
83 let ctx = PluginContext {
85 executor: pool.clone(),
86 shutdown: shutdown_rx.clone(),
87 rpc_registrar: Some(rpc_registry.clone()),
88 health_registry: Some(health_registry.clone()),
89 metrics_registry: Some(metrics_registry.clone()),
90 };
91
92 self.plugins
94 .boot_all(&ctx)
95 .await
96 .context("plugin boot failed")?;
97
98 info!(plugins = ?self.plugins.names(), "all plugins booted");
99
100 let rpc_path = self.config.server.rpc_socket.clone();
102 let rpc_reg = rpc_registry.clone();
103 let rpc_sd = shutdown_rx.clone();
104 let rpc_task = tokio::spawn(async move {
105 if let Err(e) = rpc_server::run_rpc_server(&rpc_path, rpc_reg, rpc_sd).await {
106 warn!(error = ?e, "admin RPC server error");
107 }
108 });
109
110 wait_for_signal().await;
112 info!("shutdown signal received; draining");
113
114 let _ = shutdown_tx.send(true);
116
117 let timeout = self.config.server.shutdown_timeout;
119 let shutdown_result =
120 tokio::time::timeout(timeout, async { self.plugins.shutdown_all().await }).await;
121
122 if shutdown_result.is_err() {
123 warn!(?timeout, "graceful shutdown timed out; forcing");
124 }
125
126 rpc_task.abort();
127 info!("folk server stopped");
128 Ok(())
129 }
130}
131
132#[cfg(not(target_os = "windows"))]
133async fn wait_for_signal() {
134 use tokio::signal::unix::{SignalKind, signal};
135 let mut sigterm = signal(SignalKind::terminate()).expect("SIGTERM handler");
136 let mut sigint = signal(SignalKind::interrupt()).expect("SIGINT handler");
137 tokio::select! {
138 _ = sigterm.recv() => { info!("received SIGTERM"); },
139 _ = sigint.recv() => { info!("received SIGINT"); },
140 }
141}
142
143#[cfg(target_os = "windows")]
144async fn wait_for_signal() {
145 tokio::signal::ctrl_c().await.expect("ctrl-c");
146}