use bon::bon;
use std::{
fmt,
io::{Error, Write},
net::{AddrParseError, IpAddr, SocketAddr, TcpStream, UdpSocket},
str::FromStr,
time::{Duration, SystemTime, UNIX_EPOCH},
};
const DEFAULT_RETRIES: u8 = 3;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
const DEFAULT_TCP_TTL: Duration = Duration::from_secs(240);
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub enum Protocol {
#[default]
Tcp,
Udp,
}
#[derive(Debug)]
enum Connection {
Tcp(TcpStream),
Udp(UdpSocket),
}
#[derive(Debug)]
pub struct GraphiteClient {
connection: Connection,
sock_addr: SocketAddr,
_address: String,
_port: u16,
protocol: Protocol,
retries: u8,
timeout: Duration,
tcp_ttl: Duration,
}
#[bon]
impl GraphiteClient {
#[builder]
pub fn new(
address: impl Into<String>,
port: u16,
#[builder(default = Protocol::default())]
protocol: Protocol,
#[builder(default = DEFAULT_RETRIES)]
retries: u8,
#[builder(default = DEFAULT_TIMEOUT)]
timeout: Duration,
#[builder(default = DEFAULT_TCP_TTL)]
tcp_ttl: Duration,
) -> Result<Self, GraphiteError> {
let address = address.into();
let sock_addr = SocketAddr::new(IpAddr::from_str(&address)?, port);
let connection = match protocol {
Protocol::Tcp => {
let tcp_stream = TcpStream::connect_timeout(&sock_addr, timeout)?;
tcp_stream.set_ttl(tcp_ttl.as_secs() as u32)?;
tcp_stream.set_nodelay(true)?;
Connection::Tcp(tcp_stream)
}
Protocol::Udp => {
let udp_socket = UdpSocket::bind("0.0.0.0:0")?;
udp_socket.connect(sock_addr)?;
Connection::Udp(udp_socket)
}
};
Ok(Self {
connection,
sock_addr,
_address: address,
_port: port,
protocol,
retries,
timeout,
tcp_ttl,
})
}
pub fn reconnect(&mut self) -> Result<(), GraphiteError> {
if self.protocol == Protocol::Udp {
return Ok(());
}
let mut last_err: Error = Error::last_os_error();
let mut i = 0;
while i < self.retries {
let connect = TcpStream::connect_timeout(&self.sock_addr, self.timeout);
match connect {
Ok(tcp) => {
tcp.set_ttl(self.tcp_ttl.as_secs() as u32)?;
tcp.set_nodelay(true)?;
self.connection = Connection::Tcp(tcp);
return Ok(());
}
Err(err) => last_err = err,
}
i += 1;
}
Err(GraphiteError {
msg: format!("Graphite Error: {last_err}"),
})
}
pub fn send_message(&mut self, msg: &GraphiteMessage) -> Result<usize, GraphiteError> {
let data = msg.to_string();
match &mut self.connection {
Connection::Udp(udp_socket) => {
udp_socket.send(data.as_bytes())?;
Ok(data.len())
}
Connection::Tcp(_) => {
let mut last_err: Error = Error::last_os_error();
for _ in 0..self.retries {
if let Connection::Tcp(tcp_stream) = &mut self.connection {
let res = tcp_stream.write_all(data.as_bytes());
match res {
Ok(_) => return Ok(data.len()),
Err(err) => {
last_err = err;
self.reconnect()?;
}
}
}
}
Err(GraphiteError {
msg: format!("Graphite Error: {last_err}"),
})
}
}
}
pub fn send_batch_message(&mut self, msgs: &[GraphiteMessage]) -> Result<usize, GraphiteError> {
match &mut self.connection {
Connection::Udp(udp_socket) => {
const MAX_UDP_SIZE: usize = 1400;
let mut total_sent_bytes = 0;
let mut current_batch = String::with_capacity(MAX_UDP_SIZE);
for msg in msgs {
let msg_str = msg.to_string();
if !current_batch.is_empty()
&& current_batch.len() + msg_str.len() > MAX_UDP_SIZE
{
udp_socket.send(current_batch.as_bytes())?;
total_sent_bytes += current_batch.len();
current_batch.clear();
}
if msg_str.len() > MAX_UDP_SIZE {
udp_socket.send(msg_str.as_bytes())?;
total_sent_bytes += msg_str.len();
} else {
current_batch.push_str(&msg_str);
}
}
if !current_batch.is_empty() {
udp_socket.send(current_batch.as_bytes())?;
total_sent_bytes += current_batch.len();
}
Ok(total_sent_bytes)
}
Connection::Tcp(_) => {
let combined: String = msgs.iter().map(ToString::to_string).collect();
let mut last_err: Error = Error::last_os_error();
for _ in 0..self.retries {
if let Connection::Tcp(tcp_stream) = &mut self.connection {
let res = tcp_stream.write_all(combined.as_bytes());
match res {
Ok(_) => return Ok(combined.len()),
Err(err) => {
last_err = err;
self.reconnect()?;
}
}
}
}
Err(GraphiteError {
msg: format!("Graphite Error: {last_err}"),
})
}
}
}
}
impl Drop for GraphiteClient {
fn drop(&mut self) {
if let Connection::Tcp(tcp_stream) = &self.connection {
let _ = tcp_stream.shutdown(std::net::Shutdown::Both);
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct GraphiteMessage {
metric_path: String,
value: String,
timestamp: u64,
}
impl GraphiteMessage {
pub fn new(metric_path: &str, value: &str) -> Self {
Self {
metric_path: metric_path.to_string(),
value: value.to_string(),
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
}
}
pub fn new_with_ts(metric_path: &str, value: &str, ts: u64) -> Self {
Self {
metric_path: metric_path.to_string(),
value: value.to_string(),
timestamp: ts,
}
}
}
impl fmt::Display for GraphiteMessage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "{} {} {}", self.metric_path, self.value, self.timestamp)
}
}
#[derive(Clone)]
pub struct GraphiteError {
pub msg: String,
}
impl fmt::Display for GraphiteError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.msg)
}
}
impl fmt::Debug for GraphiteError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "GraphiteError {{ msg: {:?} }}", self.msg)
}
}
impl std::error::Error for GraphiteError {}
impl From<AddrParseError> for GraphiteError {
fn from(err: AddrParseError) -> Self {
GraphiteError {
msg: err.to_string(),
}
}
}
impl From<Error> for GraphiteError {
fn from(err: Error) -> Self {
GraphiteError {
msg: err.to_string(),
}
}
}