jax_daemon/process/
mod.rs1pub mod utils;
2
3use std::net::SocketAddr;
4use std::str::FromStr;
5use std::time::Duration;
6
7use futures::future::join_all;
8use tokio::sync::watch;
9use tokio::time::timeout;
10use tracing_subscriber::layer::SubscriberExt;
11use tracing_subscriber::util::SubscriberInitExt;
12use tracing_subscriber::{EnvFilter, Layer};
13
14const FINAL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(30);
15
16use crate::http_server;
17use crate::{ServiceConfig, ServiceState};
18
19pub struct ShutdownHandle {
21 graceful_waiter: tokio::task::JoinHandle<()>,
22 handles: Vec<tokio::task::JoinHandle<()>>,
23 shutdown_tx: watch::Sender<()>,
24 #[cfg(feature = "fuse")]
25 state: ServiceState,
26}
27
28impl ShutdownHandle {
29 pub async fn wait(self) {
31 #[cfg(feature = "fuse")]
33 {
34 tracing::info!("Stopping all FUSE mounts...");
35 let mount_manager = self.state.mount_manager().read().await;
36 if let Some(manager) = mount_manager.as_ref() {
37 if let Err(e) = manager.stop_all().await {
38 tracing::error!("Failed to stop FUSE mounts: {}", e);
39 }
40 }
41 }
42
43 shutdown_and_join(self.graceful_waiter, self.handles).await;
44 }
45
46 pub fn shutdown(&self) {
48 let _ = self.shutdown_tx.send(());
49 }
50}
51
52fn init_logging(
55 service_config: &ServiceConfig,
56) -> Vec<tracing_appender::non_blocking::WorkerGuard> {
57 use tracing_subscriber::fmt::format::FmtSpan;
58
59 let mut guards = Vec::new();
60
61 let (stdout_writer, stdout_guard) = tracing_appender::non_blocking(std::io::stdout());
63 guards.push(stdout_guard);
64
65 let stdout_env_filter = EnvFilter::builder()
66 .with_default_directive(service_config.log_level.into())
67 .from_env_lossy();
68
69 let stdout_layer = tracing_subscriber::fmt::layer()
70 .compact()
71 .with_writer(stdout_writer)
72 .with_filter(stdout_env_filter);
73
74 if let Some(log_dir) = &service_config.log_dir {
76 if let Err(e) = std::fs::create_dir_all(log_dir) {
78 eprintln!(
79 "Warning: Failed to create log directory {:?}: {}",
80 log_dir, e
81 );
82 }
83
84 let file_appender = tracing_appender::rolling::daily(log_dir, "jax.log");
85 let (file_writer, file_guard) = tracing_appender::non_blocking(file_appender);
86 guards.push(file_guard);
87
88 let file_env_filter = EnvFilter::builder()
89 .with_default_directive(service_config.log_level.into())
90 .from_env_lossy();
91
92 let file_layer = tracing_subscriber::fmt::layer()
93 .with_writer(file_writer)
94 .with_ansi(false)
95 .with_span_events(FmtSpan::CLOSE)
96 .with_filter(file_env_filter);
97
98 tracing_subscriber::registry()
99 .with(stdout_layer)
100 .with(file_layer)
101 .init();
102 } else {
103 tracing_subscriber::registry().with(stdout_layer).init();
104 }
105
106 utils::register_panic_logger();
107 utils::report_build_info();
108
109 guards
110}
111
112async fn create_state(service_config: &ServiceConfig) -> ServiceState {
114 match ServiceState::from_config(service_config).await {
115 Ok(state) => state,
116 Err(e) => {
117 tracing::error!("error creating server state: {}", e);
118 std::process::exit(3);
119 }
120 }
121}
122
123async fn shutdown_and_join(
125 graceful_waiter: tokio::task::JoinHandle<()>,
126 handles: Vec<tokio::task::JoinHandle<()>>,
127) {
128 let _ = graceful_waiter.await;
129
130 if timeout(FINAL_SHUTDOWN_TIMEOUT, join_all(handles))
131 .await
132 .is_err()
133 {
134 tracing::error!(
135 "Failed to shut down within {} seconds",
136 FINAL_SHUTDOWN_TIMEOUT.as_secs()
137 );
138 std::process::exit(4);
139 }
140}
141
142pub async fn start_service(service_config: &ServiceConfig) -> (ServiceState, ShutdownHandle) {
147 let (graceful_waiter, shutdown_tx, shutdown_rx) = utils::graceful_shutdown_blocker();
148 let state = create_state(service_config).await;
149
150 let mut handles = Vec::new();
151
152 let peer = state.peer().clone();
154 let peer_rx = shutdown_rx.clone();
155 let peer_handle = tokio::spawn(async move {
156 if let Err(e) = common::peer::spawn(peer, peer_rx).await {
157 tracing::error!("Peer error: {}", e);
158 }
159 });
160 handles.push(peer_handle);
161
162 let api_port = service_config.api_port;
164 let api_addr = SocketAddr::from_str(&format!("0.0.0.0:{}", api_port))
165 .expect("Failed to parse API listen address");
166 let api_state = state.clone();
167 let api_config = http_server::Config::new(api_addr, service_config.gateway_url.clone());
168 let api_rx = shutdown_rx.clone();
169 let api_handle = tokio::spawn(async move {
170 if let Err(e) = http_server::run_api(api_config, api_state, api_rx).await {
171 tracing::error!("API server error: {}", e);
172 }
173 });
174 handles.push(api_handle);
175
176 let gw_port = service_config.gateway_port;
178 let gw_addr = SocketAddr::from_str(&format!("0.0.0.0:{}", gw_port))
179 .expect("Failed to parse gateway listen address");
180 let gw_state = state.clone();
181 let gw_config = http_server::Config::new(gw_addr, service_config.gateway_url.clone());
182 let gw_rx = shutdown_rx.clone();
183 let gw_handle = tokio::spawn(async move {
184 if let Err(e) = http_server::run_gateway(gw_config, gw_state, gw_rx).await {
185 tracing::error!("Gateway server error: {}", e);
186 }
187 });
188 handles.push(gw_handle);
189
190 tracing::info!(
191 "Running: Peer + API on port {} + Gateway on port {}",
192 api_port,
193 gw_port
194 );
195
196 #[cfg(feature = "fuse")]
198 {
199 let mount_state = state.clone();
200 tokio::spawn(async move {
201 tokio::time::sleep(Duration::from_millis(500)).await;
203
204 let mount_manager = mount_state.mount_manager().read().await;
205 if let Some(manager) = mount_manager.as_ref() {
206 if let Err(e) = manager.start_auto().await {
207 tracing::error!("Failed to start auto-mounts: {}", e);
208 }
209 }
210 });
211 }
212
213 let handle = ShutdownHandle {
214 graceful_waiter,
215 handles,
216 shutdown_tx,
217 #[cfg(feature = "fuse")]
218 state: state.clone(),
219 };
220
221 (state.clone(), handle)
222}
223
224pub async fn spawn_service(service_config: &ServiceConfig) {
227 let _guards = init_logging(service_config);
228 let (_, handle) = start_service(service_config).await;
229 handle.wait().await;
230}