Skip to main content

jax_daemon/process/
mod.rs

1pub mod utils;
2
3use std::net::SocketAddr;
4use std::str::FromStr;
5use std::time::Duration;
6
7use futures::future::join_all;
8use tokio::sync::watch;
9use tokio::time::timeout;
10use tracing_subscriber::layer::SubscriberExt;
11use tracing_subscriber::util::SubscriberInitExt;
12use tracing_subscriber::{EnvFilter, Layer};
13
14const FINAL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(30);
15
16use crate::http_server;
17use crate::{ServiceConfig, ServiceState};
18
19/// Handle for gracefully shutting down the daemon service.
20pub struct ShutdownHandle {
21    graceful_waiter: tokio::task::JoinHandle<()>,
22    handles: Vec<tokio::task::JoinHandle<()>>,
23    shutdown_tx: watch::Sender<()>,
24    #[cfg(feature = "fuse")]
25    state: ServiceState,
26}
27
28impl ShutdownHandle {
29    /// Block until the service shuts down (via signal or explicit shutdown).
30    pub async fn wait(self) {
31        // Stop all FUSE mounts before shutting down
32        #[cfg(feature = "fuse")]
33        {
34            tracing::info!("Stopping all FUSE mounts...");
35            let mount_manager = self.state.mount_manager().read().await;
36            if let Some(manager) = mount_manager.as_ref() {
37                if let Err(e) = manager.stop_all().await {
38                    tracing::error!("Failed to stop FUSE mounts: {}", e);
39                }
40            }
41        }
42
43        shutdown_and_join(self.graceful_waiter, self.handles).await;
44    }
45
46    /// Trigger shutdown programmatically (e.g. from Tauri quit).
47    pub fn shutdown(&self) {
48        let _ = self.shutdown_tx.send(());
49    }
50}
51
52/// Initialize logging, panic handler, and build info reporting.
53/// Returns guards that must be kept alive for the duration of the program.
54fn init_logging(
55    service_config: &ServiceConfig,
56) -> Vec<tracing_appender::non_blocking::WorkerGuard> {
57    use tracing_subscriber::fmt::format::FmtSpan;
58
59    let mut guards = Vec::new();
60
61    // Stdout layer
62    let (stdout_writer, stdout_guard) = tracing_appender::non_blocking(std::io::stdout());
63    guards.push(stdout_guard);
64
65    let stdout_env_filter = EnvFilter::builder()
66        .with_default_directive(service_config.log_level.into())
67        .from_env_lossy();
68
69    let stdout_layer = tracing_subscriber::fmt::layer()
70        .compact()
71        .with_writer(stdout_writer)
72        .with_filter(stdout_env_filter);
73
74    // File layer (if log_dir is set)
75    if let Some(log_dir) = &service_config.log_dir {
76        // Create the log directory if it doesn't exist
77        if let Err(e) = std::fs::create_dir_all(log_dir) {
78            eprintln!(
79                "Warning: Failed to create log directory {:?}: {}",
80                log_dir, e
81            );
82        }
83
84        let file_appender = tracing_appender::rolling::daily(log_dir, "jax.log");
85        let (file_writer, file_guard) = tracing_appender::non_blocking(file_appender);
86        guards.push(file_guard);
87
88        let file_env_filter = EnvFilter::builder()
89            .with_default_directive(service_config.log_level.into())
90            .from_env_lossy();
91
92        let file_layer = tracing_subscriber::fmt::layer()
93            .with_writer(file_writer)
94            .with_ansi(false)
95            .with_span_events(FmtSpan::CLOSE)
96            .with_filter(file_env_filter);
97
98        tracing_subscriber::registry()
99            .with(stdout_layer)
100            .with(file_layer)
101            .init();
102    } else {
103        tracing_subscriber::registry().with(stdout_layer).init();
104    }
105
106    utils::register_panic_logger();
107    utils::report_build_info();
108
109    guards
110}
111
112/// Create service state from config, exiting on error.
113async fn create_state(service_config: &ServiceConfig) -> ServiceState {
114    match ServiceState::from_config(service_config).await {
115        Ok(state) => state,
116        Err(e) => {
117            tracing::error!("error creating server state: {}", e);
118            std::process::exit(3);
119        }
120    }
121}
122
123/// Wait for shutdown and join all handles with timeout.
124async fn shutdown_and_join(
125    graceful_waiter: tokio::task::JoinHandle<()>,
126    handles: Vec<tokio::task::JoinHandle<()>>,
127) {
128    let _ = graceful_waiter.await;
129
130    if timeout(FINAL_SHUTDOWN_TIMEOUT, join_all(handles))
131        .await
132        .is_err()
133    {
134        tracing::error!(
135            "Failed to shut down within {} seconds",
136            FINAL_SHUTDOWN_TIMEOUT.as_secs()
137        );
138        std::process::exit(4);
139    }
140}
141
142/// Create state and spawn background tasks, returning the state handle.
143///
144/// Use this when you need access to `ServiceState` (e.g. from Tauri IPC commands).
145/// The returned `ShutdownHandle` must be kept alive; dropping it does not stop the service.
146pub async fn start_service(service_config: &ServiceConfig) -> (ServiceState, ShutdownHandle) {
147    let (graceful_waiter, shutdown_tx, shutdown_rx) = utils::graceful_shutdown_blocker();
148    let state = create_state(service_config).await;
149
150    let mut handles = Vec::new();
151
152    // Always spawn peer
153    let peer = state.peer().clone();
154    let peer_rx = shutdown_rx.clone();
155    let peer_handle = tokio::spawn(async move {
156        if let Err(e) = common::peer::spawn(peer, peer_rx).await {
157            tracing::error!("Peer error: {}", e);
158        }
159    });
160    handles.push(peer_handle);
161
162    // Spawn API server
163    let api_port = service_config.api_port;
164    let api_addr = SocketAddr::from_str(&format!("0.0.0.0:{}", api_port))
165        .expect("Failed to parse API listen address");
166    let api_state = state.clone();
167    let api_config = http_server::Config::new(api_addr, service_config.gateway_url.clone());
168    let api_rx = shutdown_rx.clone();
169    let api_handle = tokio::spawn(async move {
170        if let Err(e) = http_server::run_api(api_config, api_state, api_rx).await {
171            tracing::error!("API server error: {}", e);
172        }
173    });
174    handles.push(api_handle);
175
176    // Spawn gateway server
177    let gw_port = service_config.gateway_port;
178    let gw_addr = SocketAddr::from_str(&format!("0.0.0.0:{}", gw_port))
179        .expect("Failed to parse gateway listen address");
180    let gw_state = state.clone();
181    let gw_config = http_server::Config::new(gw_addr, service_config.gateway_url.clone());
182    let gw_rx = shutdown_rx.clone();
183    let gw_handle = tokio::spawn(async move {
184        if let Err(e) = http_server::run_gateway(gw_config, gw_state, gw_rx).await {
185            tracing::error!("Gateway server error: {}", e);
186        }
187    });
188    handles.push(gw_handle);
189
190    tracing::info!(
191        "Running: Peer + API on port {} + Gateway on port {}",
192        api_port,
193        gw_port
194    );
195
196    // Start auto-mounts (with fuse feature)
197    #[cfg(feature = "fuse")]
198    {
199        let mount_state = state.clone();
200        tokio::spawn(async move {
201            // Small delay to ensure services are ready
202            tokio::time::sleep(Duration::from_millis(500)).await;
203
204            let mount_manager = mount_state.mount_manager().read().await;
205            if let Some(manager) = mount_manager.as_ref() {
206                if let Err(e) = manager.start_auto().await {
207                    tracing::error!("Failed to start auto-mounts: {}", e);
208                }
209            }
210        });
211    }
212
213    let handle = ShutdownHandle {
214        graceful_waiter,
215        handles,
216        shutdown_tx,
217        #[cfg(feature = "fuse")]
218        state: state.clone(),
219    };
220
221    (state.clone(), handle)
222}
223
224/// Spawns the daemon service: P2P peer + API server + gateway server.
225/// Blocks until shutdown signal is received. Use for CLI binary usage.
226pub async fn spawn_service(service_config: &ServiceConfig) {
227    let _guards = init_logging(service_config);
228    let (_, handle) = start_service(service_config).await;
229    handle.wait().await;
230}