use std::time::{Duration, Instant};
use prost::Message;
use crate::encoder::remote_write::{parse_length_prefixed_timeseries, TimeSeries, WriteRequest};
use crate::sink::retry::RetryPolicy;
use crate::sink::Sink;
use crate::{EncoderError, SondaError};
pub const DEFAULT_BATCH_SIZE: usize = 5;
pub struct RemoteWriteSink {
client: ureq::Agent,
url: String,
batch: Vec<TimeSeries>,
batch_size: usize,
retry_policy: Option<RetryPolicy>,
max_buffer_age: Duration,
last_flush_at: Instant,
last_write_delivered: bool,
}
impl RemoteWriteSink {
pub fn new(
url: &str,
batch_size: usize,
retry_policy: Option<RetryPolicy>,
max_buffer_age: Duration,
) -> Result<Self, SondaError> {
if !url.starts_with("http://") && !url.starts_with("https://") {
return Err(SondaError::Sink(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!(
"invalid remote write URL '{}': must start with http:// or https://",
url
),
)));
}
let client = ureq::AgentBuilder::new().build();
Ok(Self {
client,
url: url.to_owned(),
batch: Vec::with_capacity(batch_size),
batch_size,
retry_policy,
max_buffer_age,
last_flush_at: Instant::now(),
last_write_delivered: false,
})
}
fn send_batch(&mut self) -> Result<(), SondaError> {
if self.batch.is_empty() {
return Ok(());
}
self.last_flush_at = Instant::now();
let write_request = WriteRequest {
timeseries: std::mem::take(&mut self.batch),
};
let encoded_len = write_request.encoded_len();
let mut proto_bytes = Vec::with_capacity(encoded_len);
write_request.encode(&mut proto_bytes).map_err(|e| {
SondaError::Encoder(EncoderError::Other(format!("protobuf encode error: {e}")))
})?;
let mut snappy_encoder = snap::raw::Encoder::new();
let compressed = snappy_encoder.compress_vec(&proto_bytes).map_err(|e| {
SondaError::Encoder(EncoderError::Other(format!(
"snappy compression error: {e}"
)))
})?;
let result = match &self.retry_policy {
Some(policy) => {
let policy = policy.clone();
policy.execute(|| self.do_post_checked(&compressed), Self::is_retryable)
}
None => self.do_post_checked(&compressed),
};
match &result {
Err(SondaError::Sink(io_err)) if io_err.kind() == std::io::ErrorKind::InvalidInput => {
Ok(())
}
_ => result,
}
}
fn do_post_checked(&self, body: &[u8]) -> Result<(), SondaError> {
let status = self.do_post(body)?;
if (200..300).contains(&status) {
return Ok(());
}
if (400..500).contains(&status) && status != 429 {
eprintln!(
"sonda: remote_write sink: received HTTP {} from '{}'; discarding batch",
status, self.url
);
return Err(SondaError::Sink(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("HTTP {} from '{}'", status, self.url),
)));
}
Err(SondaError::Sink(std::io::Error::other(format!(
"HTTP {} from '{}'",
status, self.url
))))
}
fn is_retryable(err: &SondaError) -> bool {
if let SondaError::Sink(io_err) = err {
let msg = io_err.to_string();
if msg.contains("HTTP 4") && !msg.contains("HTTP 429") {
return false;
}
return true;
}
false
}
fn do_post(&self, body: &[u8]) -> Result<u16, SondaError> {
let response = self
.client
.post(&self.url)
.set("Content-Type", "application/x-protobuf")
.set("Content-Encoding", "snappy")
.set("X-Prometheus-Remote-Write-Version", "0.1.0")
.send_bytes(body);
match response {
Ok(resp) => Ok(resp.status()),
Err(ureq::Error::Status(code, _)) => Ok(code),
Err(e) => Err(SondaError::Sink(std::io::Error::new(
std::io::ErrorKind::ConnectionRefused,
format!("remote write to '{}' failed: {}", self.url, e),
))),
}
}
}
impl Sink for RemoteWriteSink {
fn write(&mut self, data: &[u8]) -> Result<(), SondaError> {
let timeseries_list = parse_length_prefixed_timeseries(data)?;
self.batch.extend(timeseries_list);
let size_reached = self.batch.len() >= self.batch_size;
let age_reached =
!self.max_buffer_age.is_zero() && self.last_flush_at.elapsed() >= self.max_buffer_age;
let should_flush = size_reached || age_reached;
if should_flush {
self.send_batch()?;
}
self.last_write_delivered = should_flush;
Ok(())
}
fn flush(&mut self) -> Result<(), SondaError> {
self.send_batch()
}
fn last_write_delivered(&self) -> bool {
self.last_write_delivered
}
}
#[cfg(test)]
mod tests {
use std::io::{BufRead, BufReader, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::thread;
use super::*;
use crate::encoder::remote_write::RemoteWriteEncoder;
use crate::encoder::Encoder;
use crate::model::metric::{Labels, MetricEvent};
use crate::sink::{create_sink, Sink, SinkConfig};
fn mock_server_listener() -> (TcpListener, String) {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind listener");
let port = listener.local_addr().expect("local addr").port();
let url = format!("http://127.0.0.1:{port}/api/v1/write");
(listener, url)
}
fn accept_one_and_respond(listener: &TcpListener, status: u16) -> Vec<u8> {
let (mut stream, _) = listener.accept().expect("accept");
let body = read_http_request_body(&mut stream);
let response =
format!("HTTP/1.1 {status} OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
stream.write_all(response.as_bytes()).ok();
body
}
fn read_http_request_body(stream: &mut TcpStream) -> Vec<u8> {
let mut reader = BufReader::new(stream.try_clone().expect("clone stream"));
let mut content_length: usize = 0;
loop {
let mut line = String::new();
reader.read_line(&mut line).expect("read header line");
if line == "\r\n" || line.is_empty() {
break;
}
let lower = line.to_lowercase();
if let Some(rest) = lower.strip_prefix("content-length:") {
content_length = rest.trim().parse().unwrap_or(0);
}
}
let mut body = vec![0u8; content_length];
reader.read_exact(&mut body).expect("read body");
body
}
fn decode_write_request(body: &[u8]) -> WriteRequest {
let proto_bytes = snap::raw::Decoder::new()
.decompress_vec(body)
.expect("snappy decompress");
WriteRequest::decode(proto_bytes.as_slice()).expect("protobuf decode")
}
fn encode_one(name: &str, value: f64) -> Vec<u8> {
let labels = Labels::from_pairs(&[("host", "server1")]).expect("valid labels");
let event = MetricEvent::new(name.to_string(), value, labels).expect("valid metric name");
let mut buf = Vec::new();
RemoteWriteEncoder::new()
.encode_metric(&event, &mut buf)
.expect("encode ok");
buf
}
#[test]
fn new_with_http_url_succeeds() {
let result = RemoteWriteSink::new(
"http://127.0.0.1:9999/api/v1/write",
5,
None,
Duration::ZERO,
);
assert!(result.is_ok(), "http:// URL must be accepted");
}
#[test]
fn new_with_invalid_scheme_returns_sink_error() {
let result = RemoteWriteSink::new("ftp://example.com/write", 5, None, Duration::ZERO);
assert!(result.is_err(), "non-http URL must be rejected");
assert!(matches!(result.err().unwrap(), SondaError::Sink(_)));
}
#[test]
fn remote_write_sink_is_send_and_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<RemoteWriteSink>();
}
#[test]
fn write_below_batch_size_does_not_trigger_flush() {
let (listener, url) = mock_server_listener();
let mut sink =
RemoteWriteSink::new(&url, 100, None, Duration::ZERO).expect("construct sink");
sink.write(&encode_one("cpu", 1.0)).expect("write");
listener.set_nonblocking(true).expect("set non-blocking");
assert!(
listener.accept().is_err(),
"no request should have been sent below batch_size"
);
}
#[test]
fn explicit_flush_sends_buffered_data() {
let (listener, url) = mock_server_listener();
let handle = thread::spawn(move || accept_one_and_respond(&listener, 200));
let mut sink =
RemoteWriteSink::new(&url, 10_000, None, Duration::ZERO).expect("construct sink");
sink.write(&encode_one("cpu", 1.0)).expect("write");
sink.flush().expect("flush");
let body = handle.join().expect("mock server thread panicked");
let request = decode_write_request(&body);
assert_eq!(request.timeseries.len(), 1, "flush must deliver the batch");
}
#[test]
fn last_write_delivered_is_false_when_write_only_buffers() {
let (listener, url) = mock_server_listener();
let mut sink =
RemoteWriteSink::new(&url, 100, None, Duration::ZERO).expect("construct sink");
sink.write(&encode_one("cpu", 1.0)).expect("write buffers");
assert!(
!sink.last_write_delivered(),
"a write that only buffers must report last_write_delivered() == false"
);
listener.set_nonblocking(true).expect("set non-blocking");
assert!(listener.accept().is_err(), "no flush should have fired");
}
#[test]
fn last_write_delivered_is_true_when_write_triggers_flush() {
let (listener, url) = mock_server_listener();
let handle = thread::spawn(move || accept_one_and_respond(&listener, 200));
let mut sink = RemoteWriteSink::new(&url, 1, None, Duration::ZERO).expect("construct sink");
sink.write(&encode_one("cpu", 1.0))
.expect("write triggers flush");
handle.join().expect("mock server thread panicked");
assert!(
sink.last_write_delivered(),
"a write that triggers a successful flush must report last_write_delivered() == true"
);
}
#[test]
fn time_based_flush_fires_when_buffer_age_exceeded() {
let (listener, url) = mock_server_listener();
let handle = thread::spawn(move || accept_one_and_respond(&listener, 200));
let mut sink = RemoteWriteSink::new(&url, 10_000, None, Duration::from_millis(50))
.expect("construct sink");
sink.write(&encode_one("first", 1.0)).expect("write 1");
thread::sleep(Duration::from_millis(200));
sink.write(&encode_one("second", 2.0)).expect("write 2");
let body = handle.join().expect("mock server thread panicked");
let request = decode_write_request(&body);
assert_eq!(
request.timeseries.len(),
2,
"time-based flush must deliver both buffered TimeSeries"
);
}
#[test]
fn zero_max_buffer_age_disables_time_based_flush() {
let (listener, url) = mock_server_listener();
let mut sink =
RemoteWriteSink::new(&url, 10_000, None, Duration::ZERO).expect("construct sink");
sink.write(&encode_one("first", 1.0)).expect("write 1");
thread::sleep(Duration::from_millis(150));
sink.write(&encode_one("second", 2.0)).expect("write 2");
listener.set_nonblocking(true).expect("set non-blocking");
assert!(
listener.accept().is_err(),
"zero max_buffer_age must disable time-based flush"
);
}
#[test]
fn size_triggered_flush_resets_the_buffer_age_timer() {
let (listener, url) = mock_server_listener();
let handle = thread::spawn(move || accept_one_and_respond(&listener, 200));
let mut sink =
RemoteWriteSink::new(&url, 2, None, Duration::from_secs(60)).expect("construct sink");
sink.write(&encode_one("a", 1.0)).expect("write 1");
sink.write(&encode_one("b", 2.0)).expect("write 2");
let body = handle.join().expect("mock server thread panicked");
let request = decode_write_request(&body);
assert_eq!(
request.timeseries.len(),
2,
"size-triggered flush must deliver the full batch"
);
sink.write(&encode_one("c", 3.0))
.expect("partial write after a size flush must not time-flush immediately");
}
#[test]
fn create_sink_remote_write_with_valid_url_returns_ok() {
let config = SinkConfig::RemoteWrite {
url: "http://127.0.0.1:19999/api/v1/write".to_string(),
batch_size: None,
max_buffer_age: None,
retry: None,
};
assert!(create_sink(&config, None).is_ok());
}
#[test]
fn create_sink_remote_write_with_invalid_max_buffer_age_returns_err() {
let config = SinkConfig::RemoteWrite {
url: "http://127.0.0.1:19999/api/v1/write".to_string(),
batch_size: None,
max_buffer_age: Some("garbage".to_string()),
retry: None,
};
assert!(
create_sink(&config, None).is_err(),
"invalid max_buffer_age must cause the factory to fail"
);
}
#[cfg(feature = "config")]
#[test]
fn sink_config_remote_write_deserializes_with_max_buffer_age() {
let yaml = r#"
type: remote_write
url: "http://localhost:8428/api/v1/write"
max_buffer_age: 10s
"#;
let config: SinkConfig = serde_yaml_ng::from_str(yaml).expect("should deserialize");
match config {
SinkConfig::RemoteWrite { max_buffer_age, .. } => {
assert_eq!(max_buffer_age.as_deref(), Some("10s"));
}
other => panic!("expected RemoteWrite variant, got {other:?}"),
}
}
#[cfg(feature = "config")]
#[test]
fn sink_config_remote_write_max_buffer_age_defaults_to_none() {
let yaml = "type: remote_write\nurl: \"http://localhost:8428/api/v1/write\"";
let config: SinkConfig = serde_yaml_ng::from_str(yaml).expect("should deserialize");
match config {
SinkConfig::RemoteWrite { max_buffer_age, .. } => {
assert!(
max_buffer_age.is_none(),
"max_buffer_age should default to None"
);
}
other => panic!("expected RemoteWrite variant, got {other:?}"),
}
}
}