use std::net;
use std::io::Write;
use std::sync;
use backends::Backend;
use message::{WireMessage, MessageCompression};
use errors::{Result, ErrorKind, ResultExt};
pub struct TcpBackend {
socket: sync::Arc<sync::Mutex<net::TcpStream>>,
compression: MessageCompression,
}
impl TcpBackend {
pub fn new<T: net::ToSocketAddrs>(destination: T) -> Result<TcpBackend> {
let socket =
net::TcpStream::connect(destination).chain_err(|| {
ErrorKind::BackendCreationFailed("Failed to establish TCP connection")
})?;
socket.set_nonblocking(true).chain_err(|| {
ErrorKind::BackendCreationFailed("Failed to set TcpStream to non-blocking mode")
})?;
Ok(TcpBackend {
socket: sync::Arc::new(sync::Mutex::new(socket)),
compression: MessageCompression::default(),
})
}
pub fn compression(&self) -> MessageCompression {
self.compression
}
pub fn set_compression(&mut self, compression: MessageCompression) -> &mut Self {
self.compression = compression;
self
}
}
impl Backend for TcpBackend {
fn log_message(&self, msg: WireMessage) -> Result<()> {
let mut msg = msg.to_compressed_gelf(self.compression)?;
msg.push(0x00);
let mut socket = self.socket.lock().unwrap();
socket.write_all(&msg).chain_err(|| ErrorKind::LogTransmitFailed)
}
}
impl Drop for TcpBackend {
fn drop(&mut self) {
let mut socket = self.socket.lock().unwrap();
socket.flush()
.and_then(|_| socket.shutdown(net::Shutdown::Both))
.unwrap_or_else(|_| warn!("Failed to flush and shutdown tcp socket cleanly"));
}
}