use prost::Message;
use crate::encoder::remote_write::{parse_length_prefixed_timeseries, TimeSeries, WriteRequest};
use crate::sink::retry::RetryPolicy;
use crate::sink::Sink;
use crate::{EncoderError, SondaError};
pub const DEFAULT_BATCH_SIZE: usize = 5;
pub struct RemoteWriteSink {
client: ureq::Agent,
url: String,
batch: Vec<TimeSeries>,
batch_size: usize,
retry_policy: Option<RetryPolicy>,
}
impl RemoteWriteSink {
pub fn new(
url: &str,
batch_size: usize,
retry_policy: Option<RetryPolicy>,
) -> Result<Self, SondaError> {
if !url.starts_with("http://") && !url.starts_with("https://") {
return Err(SondaError::Sink(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!(
"invalid remote write URL '{}': must start with http:// or https://",
url
),
)));
}
let client = ureq::AgentBuilder::new().build();
Ok(Self {
client,
url: url.to_owned(),
batch: Vec::with_capacity(batch_size),
batch_size,
retry_policy,
})
}
fn send_batch(&mut self) -> Result<(), SondaError> {
if self.batch.is_empty() {
return Ok(());
}
let write_request = WriteRequest {
timeseries: std::mem::take(&mut self.batch),
};
let encoded_len = write_request.encoded_len();
let mut proto_bytes = Vec::with_capacity(encoded_len);
write_request.encode(&mut proto_bytes).map_err(|e| {
SondaError::Encoder(EncoderError::Other(format!("protobuf encode error: {e}")))
})?;
let mut snappy_encoder = snap::raw::Encoder::new();
let compressed = snappy_encoder.compress_vec(&proto_bytes).map_err(|e| {
SondaError::Encoder(EncoderError::Other(format!(
"snappy compression error: {e}"
)))
})?;
let result = match &self.retry_policy {
Some(policy) => {
let policy = policy.clone();
policy.execute(|| self.do_post_checked(&compressed), Self::is_retryable)
}
None => self.do_post_checked(&compressed),
};
match &result {
Err(SondaError::Sink(io_err)) if io_err.kind() == std::io::ErrorKind::InvalidInput => {
Ok(())
}
_ => result,
}
}
fn do_post_checked(&self, body: &[u8]) -> Result<(), SondaError> {
let status = self.do_post(body)?;
if (200..300).contains(&status) {
return Ok(());
}
if (400..500).contains(&status) && status != 429 {
eprintln!(
"sonda: remote_write sink: received HTTP {} from '{}'; discarding batch",
status, self.url
);
return Err(SondaError::Sink(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("HTTP {} from '{}'", status, self.url),
)));
}
Err(SondaError::Sink(std::io::Error::other(format!(
"HTTP {} from '{}'",
status, self.url
))))
}
fn is_retryable(err: &SondaError) -> bool {
if let SondaError::Sink(io_err) = err {
let msg = io_err.to_string();
if msg.contains("HTTP 4") && !msg.contains("HTTP 429") {
return false;
}
return true;
}
false
}
fn do_post(&self, body: &[u8]) -> Result<u16, SondaError> {
let response = self
.client
.post(&self.url)
.set("Content-Type", "application/x-protobuf")
.set("Content-Encoding", "snappy")
.set("X-Prometheus-Remote-Write-Version", "0.1.0")
.send_bytes(body);
match response {
Ok(resp) => Ok(resp.status()),
Err(ureq::Error::Status(code, _)) => Ok(code),
Err(e) => Err(SondaError::Sink(std::io::Error::new(
std::io::ErrorKind::ConnectionRefused,
format!("remote write to '{}' failed: {}", self.url, e),
))),
}
}
}
impl Sink for RemoteWriteSink {
fn write(&mut self, data: &[u8]) -> Result<(), SondaError> {
let timeseries_list = parse_length_prefixed_timeseries(data)?;
self.batch.extend(timeseries_list);
if self.batch.len() >= self.batch_size {
self.send_batch()?;
}
Ok(())
}
fn flush(&mut self) -> Result<(), SondaError> {
self.send_batch()
}
}