Skip to main content

logfence_client/
transport.rs

1//! Transport trait and implementations for delivering syslog messages.
2//!
3//! Two transports are provided:
4//!
5//! - [`UnixTransport`] — connects to a Unix stream socket and sends messages
6//!   using RFC 6587 §3.4.1 octet-count framing. This is the correct transport
7//!   for talking to a running `logfenced` daemon or to rsyslog's `imuxsock`.
8//!   The connection is established lazily on first send and re-established
9//!   automatically after any I/O error.
10//!
11//! - [`UnixDatagramTransport`] — sends each message as a single datagram to a
12//!   Unix socket. This is the correct transport for talking to rsyslog's
13//!   standard `imuxsock` datagram input, or to a `logfenced` instance
14//!   configured with `listen_transport = "unix_dgram"`. No framing is added;
15//!   the datagram boundary is the message boundary.
16
17use std::{path::PathBuf, time::Duration};
18
19use tokio::{
20    io::AsyncWriteExt, net::unix::OwnedWriteHalf, net::UnixDatagram, net::UnixStream, sync::Mutex,
21};
22
23use logfence_proto::syslog::SyslogMessage;
24
25use crate::error::ClientError;
26
27// ── Datagram retry ────────────────────────────────────────────────────────────
28
29fn is_buffer_full(e: &std::io::Error) -> bool {
30    matches!(e.kind(), std::io::ErrorKind::WouldBlock) || matches!(e.raw_os_error(), Some(105 | 55))
31}
32
33// Delay before the Nth attempt (1-indexed; no delay before attempt 1).
34// Starts at 100 µs for attempt 2 and doubles each subsequent attempt,
35// capped at 1 s.
36fn dgram_attempt_delay(attempt: u32) -> Duration {
37    let max = Duration::from_secs(1);
38    let shift = attempt.saturating_sub(2);
39    let micros = 1u64
40        .checked_shl(shift)
41        .map_or(u64::MAX, |v| 100u64.saturating_mul(v));
42    let delay = Duration::from_micros(micros);
43    if delay > max {
44        max
45    } else {
46        delay
47    }
48}
49
50// ── Transport trait ───────────────────────────────────────────────────────────
51
52/// Deliver a [`SyslogMessage`] to a syslog endpoint.
53///
54/// Implementors must be `Send + Sync` so they can be shared across Tokio tasks.
55/// The default implementation ([`UnixTransport`]) handles connection management
56/// internally.
57#[allow(
58    async_fn_in_trait,
59    reason = "Transport is only implemented within this crate; \
60              the implementation produces Send futures due to its Send-safe state"
61)]
62pub trait Transport: Send + Sync {
63    /// Send a single syslog message.
64    ///
65    /// # Errors
66    ///
67    /// Returns [`ClientError::Io`] on I/O failure or
68    /// [`ClientError::MessageTooLarge`] if the rendered message exceeds the
69    /// transport's configured size limit.
70    async fn send(&self, msg: &SyslogMessage) -> Result<(), ClientError>;
71}
72
73// ── UnixTransport ─────────────────────────────────────────────────────────────
74
75/// RFC 6587 §3.4.1 octet-count framing over a Unix stream socket.
76///
77/// Connects lazily on the first [`send`](Transport::send) call. If the
78/// connection is lost, the next `send` re-establishes it automatically.
79///
80/// Only the write half of the underlying socket is retained.  The read half
81/// is shut down (`SHUT_RD`) immediately after connecting, enforcing write-only
82/// direction at both the type level and the OS level.
83///
84/// Thread-safe: the socket is protected by a [`tokio::sync::Mutex`].
85pub struct UnixTransport {
86    path: PathBuf,
87    max_size: usize,
88    stream: Mutex<Option<OwnedWriteHalf>>,
89}
90
91impl UnixTransport {
92    /// Create a transport that will connect to the Unix socket at `path`.
93    ///
94    /// `max_size` is the maximum accepted wire message size in bytes.
95    /// Use `65536` for the logfenced default.
96    #[must_use]
97    pub fn new(path: impl Into<PathBuf>, max_size: usize) -> Self {
98        Self {
99            path: path.into(),
100            max_size,
101            stream: Mutex::new(None),
102        }
103    }
104}
105
106impl Transport for UnixTransport {
107    async fn send(&self, msg: &SyslogMessage) -> Result<(), ClientError> {
108        let wire = msg.to_string();
109        if wire.len() > self.max_size {
110            return Err(ClientError::MessageTooLarge {
111                max: self.max_size,
112                got: wire.len(),
113            });
114        }
115        // RFC 6587 §3.4.1: prepend "<byte-count> " before the message.
116        let frame = format!("{} {wire}", wire.len());
117        let frame_bytes = frame.as_bytes();
118
119        let mut guard = self.stream.lock().await;
120
121        if guard.is_none() {
122            // Connect, then enforce write-only direction by shutting down the
123            // read half at the OS level before splitting.
124            let conn = UnixStream::connect(&self.path).await?;
125            let std_conn = conn.into_std()?;
126            std_conn.shutdown(std::net::Shutdown::Read)?;
127            let conn = UnixStream::from_std(std_conn)?;
128            let (_, write_half) = conn.into_split();
129            *guard = Some(write_half);
130        }
131
132        // `guard` is `Some` — we just set it above if it was `None`.
133        let Some(stream) = guard.as_mut() else {
134            return Err(ClientError::Io(std::io::Error::other(
135                "internal: Unix stream not initialised",
136            )));
137        };
138
139        if let Err(e) = stream.write_all(frame_bytes).await {
140            // Drop the broken connection; next call will reconnect.
141            *guard = None;
142            return Err(ClientError::Io(e));
143        }
144
145        Ok(())
146    }
147}
148
149// ── UnixDatagramTransport ─────────────────────────────────────────────────────
150
151/// Write-only Unix datagram transport.
152///
153/// Sends each [`SyslogMessage`] as a single datagram to the configured socket
154/// path.  This matches the framing expected by rsyslog's `imuxsock` datagram
155/// input and by a `logfenced` instance configured with
156/// `listen_transport = "unix_dgram"`.
157///
158/// The socket is created unbound.  On Linux, `SHUT_RD` is issued immediately
159/// to enforce write-only direction at the OS level; macOS rejects that call on
160/// unconnected sockets, so write-only is type-enforced only on that platform.
161/// There is no framing — the datagram boundary is the message boundary.
162///
163/// Thread-safe: the socket is protected by a [`tokio::sync::Mutex`].
164pub struct UnixDatagramTransport {
165    path: PathBuf,
166    max_size: usize,
167    /// Total send attempts on `ENOBUFS`.  `0` = unlimited.  Default: `4`.
168    max_attempts: u32,
169    socket: Mutex<Option<UnixDatagram>>,
170}
171
172impl UnixDatagramTransport {
173    /// Create a transport that will send to the Unix datagram socket at `path`.
174    ///
175    /// `max_size` is the maximum accepted wire message size in bytes.
176    /// Use `65536` for the standard datagram limit.
177    ///
178    /// The default retry limit is 4 attempts.  Use [`max_attempts`] to change
179    /// it.
180    ///
181    /// [`max_attempts`]: UnixDatagramTransport::max_attempts
182    #[must_use]
183    pub fn new(path: impl Into<PathBuf>, max_size: usize) -> Self {
184        Self {
185            path: path.into(),
186            max_size,
187            max_attempts: 4,
188            socket: Mutex::new(None),
189        }
190    }
191
192    /// Set the maximum number of datagram send attempts.
193    ///
194    /// `0` means unlimited — retry until the send succeeds or a non-retryable
195    /// error occurs.  Attempt 1 is immediate; attempt 2 waits 100 µs, and
196    /// each subsequent attempt doubles the delay until reaching the 1 s cap.
197    #[must_use]
198    pub fn max_attempts(mut self, n: u32) -> Self {
199        self.max_attempts = n;
200        self
201    }
202}
203
204impl Transport for UnixDatagramTransport {
205    async fn send(&self, msg: &SyslogMessage) -> Result<(), ClientError> {
206        let wire = msg.to_string();
207        if wire.len() > self.max_size {
208            return Err(ClientError::MessageTooLarge {
209                max: self.max_size,
210                got: wire.len(),
211            });
212        }
213
214        let mut guard = self.socket.lock().await;
215
216        if guard.is_none() {
217            let sock = UnixDatagram::unbound()?;
218            // Enforce write-only direction at OS level. macOS returns ENOTCONN
219            // for unconnected datagram sockets; safe to ignore since an unbound
220            // socket has no address and cannot receive unsolicited data.
221            if let Err(e) = sock.shutdown(std::net::Shutdown::Read) {
222                if e.kind() != std::io::ErrorKind::NotConnected {
223                    return Err(ClientError::Io(e));
224                }
225            }
226            *guard = Some(sock);
227        }
228
229        // `guard` is `Some` — we just set it above if it was `None`.
230        let Some(sock) = guard.as_ref() else {
231            return Err(ClientError::Io(std::io::Error::other(
232                "internal: Unix datagram socket not initialised",
233            )));
234        };
235
236        // Use try_send_to (non-blocking) so buffer-full errors (EAGAIN on Linux,
237        // ENOBUFS on macOS) reach is_buffer_full and the retry schedule runs
238        // under our control instead of Tokio's internal re-queue.
239        let mut last_err = match sock.try_send_to(wire.as_bytes(), &self.path) {
240            Ok(_) => return Ok(()),
241            Err(e) if !is_buffer_full(&e) => {
242                *guard = None;
243                return Err(ClientError::Io(e));
244            }
245            Err(e) => e,
246        };
247        let mut attempt = 2u32;
248        loop {
249            if self.max_attempts != 0 && attempt > self.max_attempts {
250                break;
251            }
252            tokio::time::sleep(dgram_attempt_delay(attempt)).await;
253            match sock.try_send_to(wire.as_bytes(), &self.path) {
254                Ok(_) => return Ok(()),
255                Err(e) if !is_buffer_full(&e) => {
256                    *guard = None;
257                    return Err(ClientError::Io(e));
258                }
259                Err(e) => last_err = e,
260            }
261            attempt = attempt.saturating_add(1);
262        }
263        *guard = None;
264        Err(ClientError::Io(last_err))
265    }
266}
267
268// ── Tests ─────────────────────────────────────────────────────────────────────
269
270#[cfg(test)]
271#[allow(
272    clippy::unwrap_used,
273    reason = "unwrap is appropriate in test assertions"
274)]
275mod tests {
276    use std::time::Duration;
277
278    use tokio::io::AsyncReadExt;
279    use tokio::net::UnixListener;
280
281    use logfence_proto::syslog::{Facility, Priority, Severity};
282
283    use super::*;
284
285    fn sample_msg() -> SyslogMessage {
286        SyslogMessage {
287            priority: Priority {
288                facility: Facility::Local0,
289                severity: Severity::Info,
290            },
291            timestamp: None,
292            hostname: None,
293            app_name: Some("test".into()),
294            proc_id: None,
295            msg_id: None,
296            structured_data: "-".into(),
297            msg: r#"{"k":"v"}"#.into(),
298        }
299    }
300
301    #[tokio::test]
302    async fn unix_transport_sends_octet_count_frame() {
303        let dir = tempfile::tempdir().unwrap();
304        let sock_path = dir.path().join("test.sock");
305        let listener = UnixListener::bind(&sock_path).unwrap();
306
307        let transport = UnixTransport::new(&sock_path, 65536);
308        let msg = sample_msg();
309        let expected_wire = msg.to_string();
310
311        let send_task = tokio::spawn(async move { transport.send(&msg).await.unwrap() });
312
313        let (mut conn, _) = tokio::time::timeout(Duration::from_secs(1), listener.accept())
314            .await
315            .unwrap()
316            .unwrap();
317
318        let mut buf = vec![0u8; 4096];
319        let n = tokio::time::timeout(Duration::from_secs(1), conn.read(&mut buf))
320            .await
321            .unwrap()
322            .unwrap();
323        let received = std::str::from_utf8(&buf[..n]).unwrap();
324
325        // Frame must be "<count> <message>"
326        let (count_str, body) = received.split_once(' ').unwrap();
327        assert_eq!(count_str.parse::<usize>().unwrap(), expected_wire.len());
328        assert_eq!(body, expected_wire);
329
330        send_task.await.unwrap();
331    }
332
333    #[tokio::test]
334    async fn unix_transport_reconnects_after_error() {
335        let dir = tempfile::tempdir().unwrap();
336        let sock_path = dir.path().join("reconnect.sock");
337
338        let transport = UnixTransport::new(&sock_path, 65536);
339        let msg = sample_msg();
340
341        // First send fails — no listener yet.
342        assert!(transport.send(&msg).await.is_err());
343
344        // Start listener, second send should succeed.
345        let listener = UnixListener::bind(&sock_path).unwrap();
346        let send_task = tokio::spawn({
347            let msg = msg.clone();
348            async move { transport.send(&msg).await }
349        });
350
351        let accept = tokio::time::timeout(Duration::from_secs(1), listener.accept()).await;
352        assert!(accept.is_ok());
353        assert!(send_task.await.unwrap().is_ok());
354    }
355
356    #[tokio::test]
357    async fn unix_transport_rejects_oversized_message() {
358        let dir = tempfile::tempdir().unwrap();
359        let sock_path = dir.path().join("oversize.sock");
360        let transport = UnixTransport::new(&sock_path, 10); // tiny limit
361
362        let err = transport.send(&sample_msg()).await.unwrap_err();
363        assert!(matches!(err, ClientError::MessageTooLarge { .. }));
364    }
365
366    #[tokio::test]
367    async fn unix_datagram_transport_sends_raw_wire() {
368        let dir = tempfile::tempdir().unwrap();
369        let sock_path = dir.path().join("dgram.sock");
370        let receiver = UnixDatagram::bind(&sock_path).unwrap();
371
372        let transport = UnixDatagramTransport::new(&sock_path, 65536);
373        let msg = sample_msg();
374        let expected_wire = msg.to_string();
375
376        transport.send(&msg).await.unwrap();
377
378        let mut buf = vec![0u8; 4096];
379        let n = tokio::time::timeout(Duration::from_secs(1), receiver.recv(&mut buf))
380            .await
381            .unwrap()
382            .unwrap();
383        let received = std::str::from_utf8(&buf[..n]).unwrap();
384
385        // No framing — the raw RFC 5424 wire format is sent as-is.
386        assert_eq!(received, expected_wire);
387    }
388
389    #[tokio::test]
390    async fn unix_datagram_transport_rejects_oversized_message() {
391        let dir = tempfile::tempdir().unwrap();
392        let sock_path = dir.path().join("oversize_dgram.sock");
393        let transport = UnixDatagramTransport::new(&sock_path, 10); // tiny limit
394
395        let err = transport.send(&sample_msg()).await.unwrap_err();
396        assert!(matches!(err, ClientError::MessageTooLarge { .. }));
397    }
398
399    /// Fill the receiver's buffer with the minimum `SO_RCVBUF`, spawn a drain
400    /// thread that waits 200 µs (past the 100 µs first-retry delay), and verify
401    /// that `send()` recovers via the retry loop.
402    #[tokio::test]
403    async fn unix_datagram_retries_on_buffer_full() {
404        let dir = tempfile::tempdir().unwrap();
405        let sock_path = dir.path().join("dgram_retry.sock");
406
407        let receiver = std::os::unix::net::UnixDatagram::bind(&sock_path).unwrap();
408        socket2::SockRef::from(&receiver)
409            .set_recv_buffer_size(4096)
410            .unwrap();
411
412        let filler = std::os::unix::net::UnixDatagram::unbound().unwrap();
413        filler.set_nonblocking(true).unwrap();
414        let mut fill_count = 0usize;
415        loop {
416            match filler.send_to(&[0u8], &sock_path) {
417                Ok(_) => {
418                    fill_count += 1;
419                    assert!(fill_count < 100_000, "socket buffer never filled");
420                }
421                Err(ref e) if super::is_buffer_full(e) => break,
422                Err(ref e) => {
423                    assert!(super::is_buffer_full(e), "unexpected fill error: {e}");
424                    break;
425                }
426            }
427        }
428        assert!(fill_count > 0);
429
430        let drainer = receiver.try_clone().unwrap();
431        std::thread::spawn(move || {
432            std::thread::sleep(Duration::from_micros(200));
433            let mut buf = vec![0u8; 65_536];
434            drainer.set_nonblocking(true).unwrap();
435            while drainer.recv(&mut buf).is_ok() {}
436        });
437
438        let transport = UnixDatagramTransport::new(&sock_path, 65_536);
439        tokio::time::timeout(Duration::from_millis(200), transport.send(&sample_msg()))
440            .await
441            .unwrap()
442            .unwrap();
443    }
444}