logfence_client/
transport.rs1use std::{path::PathBuf, time::Duration};
18
19use tokio::{
20 io::AsyncWriteExt, net::unix::OwnedWriteHalf, net::UnixDatagram, net::UnixStream, sync::Mutex,
21};
22
23use logfence_proto::syslog::SyslogMessage;
24
25use crate::error::ClientError;
26
27fn is_buffer_full(e: &std::io::Error) -> bool {
30 matches!(e.kind(), std::io::ErrorKind::WouldBlock) || matches!(e.raw_os_error(), Some(105 | 55))
31}
32
33fn dgram_attempt_delay(attempt: u32) -> Duration {
37 let max = Duration::from_secs(1);
38 let shift = attempt.saturating_sub(2);
39 let micros = 1u64
40 .checked_shl(shift)
41 .map_or(u64::MAX, |v| 100u64.saturating_mul(v));
42 let delay = Duration::from_micros(micros);
43 if delay > max {
44 max
45 } else {
46 delay
47 }
48}
49
50#[allow(
58 async_fn_in_trait,
59 reason = "Transport is only implemented within this crate; \
60 the implementation produces Send futures due to its Send-safe state"
61)]
62pub trait Transport: Send + Sync {
63 async fn send(&self, msg: &SyslogMessage) -> Result<(), ClientError>;
71}
72
73pub struct UnixTransport {
86 path: PathBuf,
87 max_size: usize,
88 stream: Mutex<Option<OwnedWriteHalf>>,
89}
90
91impl UnixTransport {
92 #[must_use]
97 pub fn new(path: impl Into<PathBuf>, max_size: usize) -> Self {
98 Self {
99 path: path.into(),
100 max_size,
101 stream: Mutex::new(None),
102 }
103 }
104}
105
106impl Transport for UnixTransport {
107 async fn send(&self, msg: &SyslogMessage) -> Result<(), ClientError> {
108 let wire = msg.to_string();
109 if wire.len() > self.max_size {
110 return Err(ClientError::MessageTooLarge {
111 max: self.max_size,
112 got: wire.len(),
113 });
114 }
115 let frame = format!("{} {wire}", wire.len());
117 let frame_bytes = frame.as_bytes();
118
119 let mut guard = self.stream.lock().await;
120
121 if guard.is_none() {
122 let conn = UnixStream::connect(&self.path).await?;
125 let std_conn = conn.into_std()?;
126 std_conn.shutdown(std::net::Shutdown::Read)?;
127 let conn = UnixStream::from_std(std_conn)?;
128 let (_, write_half) = conn.into_split();
129 *guard = Some(write_half);
130 }
131
132 let Some(stream) = guard.as_mut() else {
134 return Err(ClientError::Io(std::io::Error::other(
135 "internal: Unix stream not initialised",
136 )));
137 };
138
139 if let Err(e) = stream.write_all(frame_bytes).await {
140 *guard = None;
142 return Err(ClientError::Io(e));
143 }
144
145 Ok(())
146 }
147}
148
149pub struct UnixDatagramTransport {
165 path: PathBuf,
166 max_size: usize,
167 max_attempts: u32,
169 socket: Mutex<Option<UnixDatagram>>,
170}
171
172impl UnixDatagramTransport {
173 #[must_use]
183 pub fn new(path: impl Into<PathBuf>, max_size: usize) -> Self {
184 Self {
185 path: path.into(),
186 max_size,
187 max_attempts: 4,
188 socket: Mutex::new(None),
189 }
190 }
191
192 #[must_use]
198 pub fn max_attempts(mut self, n: u32) -> Self {
199 self.max_attempts = n;
200 self
201 }
202}
203
204impl Transport for UnixDatagramTransport {
205 async fn send(&self, msg: &SyslogMessage) -> Result<(), ClientError> {
206 let wire = msg.to_string();
207 if wire.len() > self.max_size {
208 return Err(ClientError::MessageTooLarge {
209 max: self.max_size,
210 got: wire.len(),
211 });
212 }
213
214 let mut guard = self.socket.lock().await;
215
216 if guard.is_none() {
217 let sock = UnixDatagram::unbound()?;
218 if let Err(e) = sock.shutdown(std::net::Shutdown::Read) {
222 if e.kind() != std::io::ErrorKind::NotConnected {
223 return Err(ClientError::Io(e));
224 }
225 }
226 *guard = Some(sock);
227 }
228
229 let Some(sock) = guard.as_ref() else {
231 return Err(ClientError::Io(std::io::Error::other(
232 "internal: Unix datagram socket not initialised",
233 )));
234 };
235
236 let mut last_err = match sock.try_send_to(wire.as_bytes(), &self.path) {
240 Ok(_) => return Ok(()),
241 Err(e) if !is_buffer_full(&e) => {
242 *guard = None;
243 return Err(ClientError::Io(e));
244 }
245 Err(e) => e,
246 };
247 let mut attempt = 2u32;
248 loop {
249 if self.max_attempts != 0 && attempt > self.max_attempts {
250 break;
251 }
252 tokio::time::sleep(dgram_attempt_delay(attempt)).await;
253 match sock.try_send_to(wire.as_bytes(), &self.path) {
254 Ok(_) => return Ok(()),
255 Err(e) if !is_buffer_full(&e) => {
256 *guard = None;
257 return Err(ClientError::Io(e));
258 }
259 Err(e) => last_err = e,
260 }
261 attempt = attempt.saturating_add(1);
262 }
263 *guard = None;
264 Err(ClientError::Io(last_err))
265 }
266}
267
268#[cfg(test)]
271#[allow(
272 clippy::unwrap_used,
273 reason = "unwrap is appropriate in test assertions"
274)]
275mod tests {
276 use std::time::Duration;
277
278 use tokio::io::AsyncReadExt;
279 use tokio::net::UnixListener;
280
281 use logfence_proto::syslog::{Facility, Priority, Severity};
282
283 use super::*;
284
285 fn sample_msg() -> SyslogMessage {
286 SyslogMessage {
287 priority: Priority {
288 facility: Facility::Local0,
289 severity: Severity::Info,
290 },
291 timestamp: None,
292 hostname: None,
293 app_name: Some("test".into()),
294 proc_id: None,
295 msg_id: None,
296 structured_data: "-".into(),
297 msg: r#"{"k":"v"}"#.into(),
298 }
299 }
300
301 #[tokio::test]
302 async fn unix_transport_sends_octet_count_frame() {
303 let dir = tempfile::tempdir().unwrap();
304 let sock_path = dir.path().join("test.sock");
305 let listener = UnixListener::bind(&sock_path).unwrap();
306
307 let transport = UnixTransport::new(&sock_path, 65536);
308 let msg = sample_msg();
309 let expected_wire = msg.to_string();
310
311 let send_task = tokio::spawn(async move { transport.send(&msg).await.unwrap() });
312
313 let (mut conn, _) = tokio::time::timeout(Duration::from_secs(1), listener.accept())
314 .await
315 .unwrap()
316 .unwrap();
317
318 let mut buf = vec![0u8; 4096];
319 let n = tokio::time::timeout(Duration::from_secs(1), conn.read(&mut buf))
320 .await
321 .unwrap()
322 .unwrap();
323 let received = std::str::from_utf8(&buf[..n]).unwrap();
324
325 let (count_str, body) = received.split_once(' ').unwrap();
327 assert_eq!(count_str.parse::<usize>().unwrap(), expected_wire.len());
328 assert_eq!(body, expected_wire);
329
330 send_task.await.unwrap();
331 }
332
333 #[tokio::test]
334 async fn unix_transport_reconnects_after_error() {
335 let dir = tempfile::tempdir().unwrap();
336 let sock_path = dir.path().join("reconnect.sock");
337
338 let transport = UnixTransport::new(&sock_path, 65536);
339 let msg = sample_msg();
340
341 assert!(transport.send(&msg).await.is_err());
343
344 let listener = UnixListener::bind(&sock_path).unwrap();
346 let send_task = tokio::spawn({
347 let msg = msg.clone();
348 async move { transport.send(&msg).await }
349 });
350
351 let accept = tokio::time::timeout(Duration::from_secs(1), listener.accept()).await;
352 assert!(accept.is_ok());
353 assert!(send_task.await.unwrap().is_ok());
354 }
355
356 #[tokio::test]
357 async fn unix_transport_rejects_oversized_message() {
358 let dir = tempfile::tempdir().unwrap();
359 let sock_path = dir.path().join("oversize.sock");
360 let transport = UnixTransport::new(&sock_path, 10); let err = transport.send(&sample_msg()).await.unwrap_err();
363 assert!(matches!(err, ClientError::MessageTooLarge { .. }));
364 }
365
366 #[tokio::test]
367 async fn unix_datagram_transport_sends_raw_wire() {
368 let dir = tempfile::tempdir().unwrap();
369 let sock_path = dir.path().join("dgram.sock");
370 let receiver = UnixDatagram::bind(&sock_path).unwrap();
371
372 let transport = UnixDatagramTransport::new(&sock_path, 65536);
373 let msg = sample_msg();
374 let expected_wire = msg.to_string();
375
376 transport.send(&msg).await.unwrap();
377
378 let mut buf = vec![0u8; 4096];
379 let n = tokio::time::timeout(Duration::from_secs(1), receiver.recv(&mut buf))
380 .await
381 .unwrap()
382 .unwrap();
383 let received = std::str::from_utf8(&buf[..n]).unwrap();
384
385 assert_eq!(received, expected_wire);
387 }
388
389 #[tokio::test]
390 async fn unix_datagram_transport_rejects_oversized_message() {
391 let dir = tempfile::tempdir().unwrap();
392 let sock_path = dir.path().join("oversize_dgram.sock");
393 let transport = UnixDatagramTransport::new(&sock_path, 10); let err = transport.send(&sample_msg()).await.unwrap_err();
396 assert!(matches!(err, ClientError::MessageTooLarge { .. }));
397 }
398
399 #[tokio::test]
403 async fn unix_datagram_retries_on_buffer_full() {
404 let dir = tempfile::tempdir().unwrap();
405 let sock_path = dir.path().join("dgram_retry.sock");
406
407 let receiver = std::os::unix::net::UnixDatagram::bind(&sock_path).unwrap();
408 socket2::SockRef::from(&receiver)
409 .set_recv_buffer_size(4096)
410 .unwrap();
411
412 let filler = std::os::unix::net::UnixDatagram::unbound().unwrap();
413 filler.set_nonblocking(true).unwrap();
414 let mut fill_count = 0usize;
415 loop {
416 match filler.send_to(&[0u8], &sock_path) {
417 Ok(_) => {
418 fill_count += 1;
419 assert!(fill_count < 100_000, "socket buffer never filled");
420 }
421 Err(ref e) if super::is_buffer_full(e) => break,
422 Err(ref e) => {
423 assert!(super::is_buffer_full(e), "unexpected fill error: {e}");
424 break;
425 }
426 }
427 }
428 assert!(fill_count > 0);
429
430 let drainer = receiver.try_clone().unwrap();
431 std::thread::spawn(move || {
432 std::thread::sleep(Duration::from_micros(200));
433 let mut buf = vec![0u8; 65_536];
434 drainer.set_nonblocking(true).unwrap();
435 while drainer.recv(&mut buf).is_ok() {}
436 });
437
438 let transport = UnixDatagramTransport::new(&sock_path, 65_536);
439 tokio::time::timeout(Duration::from_millis(200), transport.send(&sample_msg()))
440 .await
441 .unwrap()
442 .unwrap();
443 }
444}