Skip to main content

fips_core/control/
mod.rs

1//! Control socket for runtime management and observability.
2//!
3//! Provides a control interface that accepts commands and returns
4//! structured JSON responses. Supports both read-only queries (show_*)
5//! and mutating commands (connect, disconnect).
6//!
7//! Platform-specific implementations:
8//! - Unix: Uses a Unix domain socket for local IPC
9//! - Windows: Uses a TCP socket on localhost (see commit 3)
10
11pub mod commands;
12pub mod protocol;
13pub mod queries;
14
15use crate::config::ControlConfig;
16use protocol::{Request, Response};
17use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
18use tokio::sync::{mpsc, oneshot};
19use tracing::{debug, info, warn};
20
21/// Maximum request size in bytes (4 KB).
22const MAX_REQUEST_SIZE: usize = 4096;
23
24/// I/O timeout for client connections.
25const IO_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
26
27/// A message sent from the accept loop to the main event loop.
28pub type ControlMessage = (Request, oneshot::Sender<Response>);
29
30/// Handle a single client connection over any AsyncRead + AsyncWrite stream.
31///
32/// Shared between Unix and Windows implementations to avoid duplicating
33/// the request/response protocol logic.
34async fn handle_connection_generic<S>(
35    stream: S,
36    control_tx: mpsc::Sender<ControlMessage>,
37) -> Result<(), Box<dyn std::error::Error>>
38where
39    S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
40{
41    let (reader, mut writer) = tokio::io::split(stream);
42    let mut buf_reader = BufReader::new(reader);
43    let mut line = String::new();
44
45    // Read one line with timeout and size limit
46    let read_result = tokio::time::timeout(IO_TIMEOUT, async {
47        let mut total = 0usize;
48        loop {
49            let n = buf_reader.read_line(&mut line).await?;
50            if n == 0 {
51                break; // EOF
52            }
53            total += n;
54            if total > MAX_REQUEST_SIZE {
55                return Err(std::io::Error::new(
56                    std::io::ErrorKind::InvalidData,
57                    "request too large",
58                ));
59            }
60            if line.ends_with('\n') {
61                break;
62            }
63        }
64        Ok(())
65    })
66    .await;
67
68    let response = match read_result {
69        Ok(Ok(())) if line.is_empty() => Response::error("empty request"),
70        Ok(Ok(())) => {
71            // Parse the request
72            match serde_json::from_str::<Request>(line.trim()) {
73                Ok(request) => {
74                    // Send to main loop and wait for response
75                    let (resp_tx, resp_rx) = oneshot::channel();
76                    if control_tx.send((request, resp_tx)).await.is_err() {
77                        Response::error("node shutting down")
78                    } else {
79                        match tokio::time::timeout(IO_TIMEOUT, resp_rx).await {
80                            Ok(Ok(resp)) => resp,
81                            Ok(Err(_)) => Response::error("response channel closed"),
82                            Err(_) => Response::error("query timeout"),
83                        }
84                    }
85                }
86                Err(e) => Response::error(format!("invalid request: {}", e)),
87            }
88        }
89        Ok(Err(e)) => Response::error(format!("read error: {}", e)),
90        Err(_) => Response::error("read timeout"),
91    };
92
93    // Write response with timeout
94    let json = serde_json::to_string(&response)?;
95    let write_result = tokio::time::timeout(IO_TIMEOUT, async {
96        writer.write_all(json.as_bytes()).await?;
97        writer.write_all(b"\n").await?;
98        writer.shutdown().await?;
99        Ok::<_, std::io::Error>(())
100    })
101    .await;
102
103    if let Err(_) | Ok(Err(_)) = write_result {
104        debug!("Control socket write failed or timed out");
105    }
106
107    Ok(())
108}
109
110// ============================================================================
111// Unix implementation
112// ============================================================================
113
114#[cfg(unix)]
115mod unix_impl {
116    use super::*;
117    use std::path::{Path, PathBuf};
118    use tokio::net::UnixListener;
119
120    /// Control socket listener (Unix domain socket).
121    ///
122    /// Manages the Unix domain socket lifecycle: bind, accept, cleanup.
123    pub struct ControlSocket {
124        listener: UnixListener,
125        socket_path: PathBuf,
126    }
127
128    impl ControlSocket {
129        /// Bind a new control socket.
130        ///
131        /// Creates parent directories if needed, removes stale socket files,
132        /// and binds the Unix listener.
133        pub fn bind(config: &ControlConfig) -> Result<Self, std::io::Error> {
134            let socket_path = PathBuf::from(&config.socket_path);
135
136            // Create parent directory if it doesn't exist
137            if let Some(parent) = socket_path.parent()
138                && !parent.exists()
139            {
140                std::fs::create_dir_all(parent)?;
141                debug!(path = %parent.display(), "Created control socket directory");
142            }
143
144            // Remove stale socket if it exists
145            if socket_path.exists() {
146                Self::remove_stale_socket(&socket_path)?;
147            }
148
149            let listener = UnixListener::bind(&socket_path)?;
150
151            // Make the socket and its parent directory group-accessible so
152            // 'fips' group members can use fipsctl/fipstop without root.
153            use std::os::unix::fs::PermissionsExt;
154            std::fs::set_permissions(&socket_path, std::fs::Permissions::from_mode(0o770))?;
155            Self::chown_to_fips_group(&socket_path);
156            if let Some(parent) = socket_path.parent() {
157                Self::chown_to_fips_group(parent);
158            }
159
160            info!(path = %socket_path.display(), "Control socket listening");
161
162            Ok(Self {
163                listener,
164                socket_path,
165            })
166        }
167
168        /// Remove a stale socket file.
169        ///
170        /// If the file exists but no one is listening, remove it so we can
171        /// bind. This handles unclean daemon exits.
172        fn remove_stale_socket(path: &Path) -> Result<(), std::io::Error> {
173            // Try connecting to see if someone is listening
174            match std::os::unix::net::UnixStream::connect(path) {
175                Ok(_) => {
176                    // Someone is listening — don't remove it
177                    Err(std::io::Error::new(
178                        std::io::ErrorKind::AddrInUse,
179                        format!("control socket already in use: {}", path.display()),
180                    ))
181                }
182                Err(_) => {
183                    // No one listening — remove the stale socket
184                    debug!(path = %path.display(), "Removing stale control socket");
185                    std::fs::remove_file(path)?;
186                    Ok(())
187                }
188            }
189        }
190
191        /// Set group ownership of a path to the 'fips' group (best-effort).
192        fn chown_to_fips_group(path: &Path) {
193            use std::ffi::CString;
194            use std::os::unix::ffi::OsStrExt;
195
196            // Look up the 'fips' group
197            let group_name = CString::new("fips").unwrap();
198            let grp = unsafe { libc::getgrnam(group_name.as_ptr()) };
199            if grp.is_null() {
200                debug!(
201                    "'fips' group not found, skipping chown for {}",
202                    path.display()
203                );
204                return;
205            }
206            let gid = unsafe { (*grp).gr_gid };
207
208            let c_path = match CString::new(path.as_os_str().as_bytes()) {
209                Ok(p) => p,
210                Err(_) => return,
211            };
212            let ret = unsafe { libc::chown(c_path.as_ptr(), u32::MAX, gid) };
213            if ret != 0 {
214                warn!(
215                    path = %path.display(),
216                    error = %std::io::Error::last_os_error(),
217                    "Failed to chown control socket to 'fips' group"
218                );
219            }
220        }
221
222        /// Run the accept loop, forwarding requests to the main event loop via mpsc.
223        ///
224        /// Each accepted connection is handled in a spawned task:
225        /// 1. Read one line of JSON (the request)
226        /// 2. Send (Request, oneshot::Sender) to the main loop
227        /// 3. Wait for the response via oneshot
228        /// 4. Write the response as one line of JSON
229        /// 5. Close the connection
230        pub async fn accept_loop(self, control_tx: mpsc::Sender<ControlMessage>) {
231            loop {
232                let (stream, _addr) = match self.listener.accept().await {
233                    Ok(conn) => conn,
234                    Err(e) => {
235                        warn!(error = %e, "Control socket accept failed");
236                        continue;
237                    }
238                };
239
240                let tx = control_tx.clone();
241                tokio::spawn(async move {
242                    if let Err(e) = handle_connection_generic(stream, tx).await {
243                        debug!(error = %e, "Control connection error");
244                    }
245                });
246            }
247        }
248
249        /// Get the socket path.
250        pub fn socket_path(&self) -> &Path {
251            &self.socket_path
252        }
253
254        /// Clean up the socket file.
255        fn cleanup(&self) {
256            if self.socket_path.exists() {
257                if let Err(e) = std::fs::remove_file(&self.socket_path) {
258                    warn!(
259                        path = %self.socket_path.display(),
260                        error = %e,
261                        "Failed to remove control socket"
262                    );
263                } else {
264                    debug!(path = %self.socket_path.display(), "Control socket removed");
265                }
266            }
267        }
268    }
269
270    impl Drop for ControlSocket {
271        fn drop(&mut self) {
272            self.cleanup();
273        }
274    }
275}
276
277// ============================================================================
278// Windows implementation (TCP on localhost)
279// ============================================================================
280
281#[cfg(windows)]
282mod windows_impl {
283    use super::*;
284    use tokio::net::TcpListener;
285
286    /// Default TCP port for the control socket on Windows.
287    const DEFAULT_CONTROL_PORT: u16 = 21210;
288
289    /// Control socket listener (Windows TCP on localhost).
290    ///
291    /// On Windows, the control socket uses a TCP listener bound to
292    /// `127.0.0.1` since Windows does not support Unix domain sockets
293    /// reliably. Only localhost connections are accepted.
294    ///
295    /// Note: Unlike Unix domain sockets, TCP does not provide filesystem-level
296    /// ACLs. Any local user can connect to the control port. This is acceptable
297    /// for single-user Windows installations but should be documented.
298    pub struct ControlSocket {
299        listener: TcpListener,
300        port: u16,
301    }
302
303    impl ControlSocket {
304        /// Bind a TCP control socket on localhost.
305        ///
306        /// Parses the port from `config.socket_path` (which is a port number
307        /// string on Windows, e.g. "21210"). Falls back to the default port
308        /// with a warning if parsing fails.
309        pub fn bind(config: &ControlConfig) -> Result<Self, std::io::Error> {
310            let port: u16 = match config.socket_path.parse() {
311                Ok(p) => p,
312                Err(e) => {
313                    warn!(
314                        path = %config.socket_path,
315                        error = %e,
316                        default = DEFAULT_CONTROL_PORT,
317                        "Invalid control port, using default"
318                    );
319                    DEFAULT_CONTROL_PORT
320                }
321            };
322
323            let addr = std::net::SocketAddr::from(([127, 0, 0, 1], port));
324            let std_listener = std::net::TcpListener::bind(addr)?;
325            std_listener.set_nonblocking(true)?;
326            let listener = TcpListener::from_std(std_listener)?;
327
328            info!(port = port, "Control socket listening on localhost");
329
330            Ok(Self { listener, port })
331        }
332
333        /// Get the listening port.
334        pub fn port(&self) -> u16 {
335            self.port
336        }
337
338        /// Run the accept loop, forwarding requests to the main event loop via mpsc.
339        ///
340        /// Each accepted connection is handled in a spawned task using the
341        /// shared `handle_connection_generic` protocol handler.
342        pub async fn accept_loop(self, control_tx: mpsc::Sender<ControlMessage>) {
343            loop {
344                let (stream, addr) = match self.listener.accept().await {
345                    Ok(conn) => conn,
346                    Err(e) => {
347                        warn!(error = %e, "Control socket accept failed");
348                        continue;
349                    }
350                };
351
352                // Only accept connections from localhost
353                if !addr.ip().is_loopback() {
354                    warn!(addr = %addr, "Rejected non-localhost control connection");
355                    continue;
356                }
357
358                let tx = control_tx.clone();
359                tokio::spawn(async move {
360                    if let Err(e) = handle_connection_generic(stream, tx).await {
361                        debug!(error = %e, "Control connection error");
362                    }
363                });
364            }
365        }
366    }
367}
368
369// Re-export platform-specific types
370#[cfg(unix)]
371pub use unix_impl::ControlSocket;
372#[cfg(windows)]
373pub use windows_impl::ControlSocket;
374
375#[cfg(test)]
376mod tests {
377    #[cfg(windows)]
378    use super::*;
379
380    #[cfg(windows)]
381    #[tokio::test]
382    async fn test_tcp_control_socket_bind() {
383        let config = ControlConfig {
384            enabled: true,
385            socket_path: "0".to_string(), // port 0 = ephemeral
386        };
387
388        // Verify the socket binds successfully on an ephemeral port
389        let _socket = ControlSocket::bind(&config).expect("failed to bind control socket");
390    }
391
392    #[cfg(windows)]
393    #[tokio::test]
394    async fn test_tcp_control_socket_invalid_port_uses_default() {
395        let config = ControlConfig {
396            enabled: true,
397            socket_path: "not-a-port".to_string(),
398        };
399
400        // Should fall back to default port 21210. This may fail if 21210
401        // is already in use, which is acceptable for a unit test.
402        let result = ControlSocket::bind(&config);
403        // We mainly verify it doesn't panic on invalid input
404        if let Ok(socket) = result {
405            assert_eq!(socket.port(), 21210);
406        }
407    }
408}