use prost::Message;
use crate::encoder::remote_write::{parse_length_prefixed_timeseries, TimeSeries, WriteRequest};
use crate::sink::Sink;
use crate::{EncoderError, SondaError};
pub const DEFAULT_BATCH_SIZE: usize = 100;
pub struct RemoteWriteSink {
client: ureq::Agent,
url: String,
batch: Vec<TimeSeries>,
batch_size: usize,
}
impl RemoteWriteSink {
pub fn new(url: &str, batch_size: usize) -> Result<Self, SondaError> {
if !url.starts_with("http://") && !url.starts_with("https://") {
return Err(SondaError::Sink(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!(
"invalid remote write URL '{}': must start with http:// or https://",
url
),
)));
}
let client = ureq::AgentBuilder::new().build();
Ok(Self {
client,
url: url.to_owned(),
batch: Vec::with_capacity(batch_size),
batch_size,
})
}
fn send_batch(&mut self) -> Result<(), SondaError> {
if self.batch.is_empty() {
return Ok(());
}
let write_request = WriteRequest {
timeseries: std::mem::take(&mut self.batch),
};
let encoded_len = write_request.encoded_len();
let mut proto_bytes = Vec::with_capacity(encoded_len);
write_request.encode(&mut proto_bytes).map_err(|e| {
SondaError::Encoder(EncoderError::Other(format!("protobuf encode error: {e}")))
})?;
let mut snappy_encoder = snap::raw::Encoder::new();
let compressed = snappy_encoder.compress_vec(&proto_bytes).map_err(|e| {
SondaError::Encoder(EncoderError::Other(format!(
"snappy compression error: {e}"
)))
})?;
let result = self.do_post(&compressed);
match result {
Ok(status) if (200..300).contains(&status) => Ok(()),
Ok(status) if (400..500).contains(&status) => {
eprintln!(
"sonda: remote write sink received {} response from '{}'; discarding batch",
status, self.url
);
Ok(())
}
Ok(status) => {
let retry_result = self.do_post(&compressed);
match retry_result {
Ok(retry_status) if (200..300).contains(&retry_status) => Ok(()),
Ok(retry_status) => Err(SondaError::Sink(std::io::Error::other(format!(
"remote write to '{}' failed with status {} (retry status {})",
self.url, status, retry_status
)))),
Err(e) => Err(e),
}
}
Err(e) => Err(e),
}
}
fn do_post(&self, body: &[u8]) -> Result<u16, SondaError> {
let response = self
.client
.post(&self.url)
.set("Content-Type", "application/x-protobuf")
.set("Content-Encoding", "snappy")
.set("X-Prometheus-Remote-Write-Version", "0.1.0")
.send_bytes(body);
match response {
Ok(resp) => Ok(resp.status()),
Err(ureq::Error::Status(code, _)) => Ok(code),
Err(e) => Err(SondaError::Sink(std::io::Error::new(
std::io::ErrorKind::ConnectionRefused,
format!("remote write to '{}' failed: {}", self.url, e),
))),
}
}
}
impl Sink for RemoteWriteSink {
fn write(&mut self, data: &[u8]) -> Result<(), SondaError> {
let timeseries_list = parse_length_prefixed_timeseries(data)?;
self.batch.extend(timeseries_list);
if self.batch.len() >= self.batch_size {
self.send_batch()?;
}
Ok(())
}
fn flush(&mut self) -> Result<(), SondaError> {
self.send_batch()
}
}