1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
use failure;
use failure::Fail;
use std::io::Write;
use std::net;
use std::sync;
use backends::Backend;
use errors::{Error, Result};
use message::WireMessage;
pub struct TcpBackend {
socket: sync::Arc<sync::Mutex<net::TcpStream>>,
}
impl TcpBackend {
pub fn new<T: net::ToSocketAddrs>(destination: T) -> Result<TcpBackend> {
let socket = net::TcpStream::connect(destination).map_err(|e| {
failure::Error::from(e)
.context("Failed to establish TCP connection")
.context(Error::BackendCreationFailed)
})?;
socket.set_nonblocking(true).map_err(|e| {
e.context("Failed to set TcpStream to non-blocking mode")
.context(Error::BackendCreationFailed)
})?;
Ok(TcpBackend {
socket: sync::Arc::new(sync::Mutex::new(socket)),
})
}
}
impl Backend for TcpBackend {
fn log_message(&self, msg: WireMessage) -> Result<()> {
let mut msg: Vec<u8> = msg.to_gelf()?.into();
msg.push(0x00);
let mut socket = self.socket.lock().unwrap();
socket
.write_all(&msg)
.map_err(|e| e.context(Error::LogTransmitFailed))?;
Ok(())
}
}
impl Drop for TcpBackend {
fn drop(&mut self) {
let mut socket = self.socket.lock().unwrap();
socket
.flush()
.and_then(|_| socket.shutdown(net::Shutdown::Both))
.unwrap_or_else(|_| warn!("Failed to flush and shutdown tcp socket cleanly"));
}
}