use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};
use crate::sink::retry::RetryPolicy;
use crate::sink::Sink;
use crate::SondaError;
pub const DEFAULT_BATCH_SIZE: usize = 5;
pub struct LokiSink {
client: ureq::Agent,
url: String,
labels: HashMap<String, String>,
batch_size: usize,
batch: Vec<(String, String)>,
retry_policy: Option<RetryPolicy>,
}
impl LokiSink {
pub fn new(
url: String,
labels: HashMap<String, String>,
batch_size: usize,
retry_policy: Option<RetryPolicy>,
) -> Result<Self, SondaError> {
if !url.starts_with("http://") && !url.starts_with("https://") {
return Err(SondaError::Sink(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!(
"invalid Loki URL '{}': must start with http:// or https://",
url
),
)));
}
let client = ureq::AgentBuilder::new().build();
Ok(Self {
client,
url,
labels,
batch_size,
batch: Vec::with_capacity(batch_size),
retry_policy,
})
}
fn build_envelope(&self) -> String {
let stream_labels = self
.labels
.iter()
.map(|(k, v)| format!("\"{}\":\"{}\"", escape_json(k), escape_json(v)))
.collect::<Vec<_>>()
.join(",");
let values = self
.batch
.iter()
.map(|(ts, line)| format!("[\"{}\",\"{}\"]", ts, escape_json(line)))
.collect::<Vec<_>>()
.join(",");
format!(
"{{\"streams\":[{{\"stream\":{{{}}},\"values\":[{}]}}]}}",
stream_labels, values
)
}
fn flush_batch(&mut self) -> Result<(), SondaError> {
if self.batch.is_empty() {
return Ok(());
}
let push_url = format!("{}/loki/api/v1/push", self.url);
let body = self.build_envelope();
let result = match &self.retry_policy {
Some(policy) => {
let policy = policy.clone();
let client = &self.client;
policy.execute(
|| Self::do_post_checked(client, &push_url, &body),
Self::is_retryable,
)
}
None => Self::do_post_checked(&self.client, &push_url, &body),
};
self.batch.clear();
match &result {
Err(SondaError::Sink(io_err)) if io_err.kind() == std::io::ErrorKind::InvalidInput => {
Ok(())
}
_ => result,
}
}
fn do_post_checked(client: &ureq::Agent, push_url: &str, body: &str) -> Result<(), SondaError> {
let status = Self::do_post(client, push_url, body)?;
if (200..300).contains(&status) {
return Ok(());
}
if (400..500).contains(&status) && status != 429 {
eprintln!(
"sonda: loki sink: received HTTP {} from '{}'; discarding batch",
status, push_url
);
return Err(SondaError::Sink(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("HTTP {} from '{}'", status, push_url),
)));
}
Err(SondaError::Sink(std::io::Error::other(format!(
"HTTP {} from '{}'",
status, push_url
))))
}
fn do_post(client: &ureq::Agent, push_url: &str, body: &str) -> Result<u16, SondaError> {
let response = client
.post(push_url)
.set("Content-Type", "application/json")
.send_string(body);
match response {
Ok(resp) => Ok(resp.status()),
Err(ureq::Error::Status(code, _)) => Ok(code),
Err(e) => Err(SondaError::Sink(std::io::Error::new(
std::io::ErrorKind::ConnectionRefused,
format!("Loki push to '{}' failed: {}", push_url, e),
))),
}
}
fn is_retryable(err: &SondaError) -> bool {
if let SondaError::Sink(io_err) = err {
if io_err.kind() == std::io::ErrorKind::InvalidInput {
return false;
}
return true;
}
false
}
}
impl Sink for LokiSink {
fn write(&mut self, data: &[u8]) -> Result<(), SondaError> {
let ts_ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos()
.to_string();
let line = String::from_utf8_lossy(data);
let line = line.trim_end_matches('\n').to_string();
self.batch.push((ts_ns, line));
if self.batch.len() >= self.batch_size {
self.flush_batch()?;
}
Ok(())
}
fn flush(&mut self) -> Result<(), SondaError> {
self.flush_batch()
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::thread;
use super::*;
use crate::sink::{create_sink, SinkConfig};
fn mock_loki_listener() -> (TcpListener, String) {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind listener");
let port = listener.local_addr().expect("local addr").port();
let url = format!("http://127.0.0.1:{port}");
(listener, url)
}
fn accept_one_and_respond(listener: &TcpListener, status: u16) -> Vec<u8> {
let (mut stream, _) = listener.accept().expect("accept connection");
let body = read_http_body(&mut stream);
let reason = if status < 300 { "OK" } else { "Error" };
let resp =
format!("HTTP/1.1 {status} {reason}\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
stream.write_all(resp.as_bytes()).ok();
body
}
fn read_http_body(stream: &mut TcpStream) -> Vec<u8> {
let mut reader = BufReader::new(stream.try_clone().expect("clone stream"));
let mut content_length: usize = 0;
loop {
let mut line = String::new();
reader.read_line(&mut line).expect("read header line");
if line == "\r\n" || line.is_empty() {
break;
}
let lower = line.to_lowercase();
if lower.starts_with("content-length:") {
let val = lower["content-length:".len()..].trim().to_string();
content_length = val.parse().unwrap_or(0);
}
}
let mut body = vec![0u8; content_length];
reader.read_exact(&mut body).expect("read body");
body
}
#[test]
fn new_with_http_url_succeeds() {
let result = LokiSink::new(
"http://localhost:3100".to_string(),
HashMap::new(),
100,
None,
);
assert!(result.is_ok(), "http:// URL must be accepted");
}
#[test]
fn new_with_https_url_succeeds() {
let result = LokiSink::new(
"https://loki.example.com".to_string(),
HashMap::new(),
100,
None,
);
assert!(result.is_ok(), "https:// URL must be accepted");
}
#[test]
fn new_with_invalid_scheme_returns_sink_error() {
let result = LokiSink::new(
"ftp://loki.example.com".to_string(),
HashMap::new(),
100,
None,
);
assert!(result.is_err(), "non-http:// URL must be rejected");
assert!(
matches!(result.err().unwrap(), SondaError::Sink(_)),
"expected SondaError::Sink"
);
}
#[test]
fn new_with_bare_hostname_returns_sink_error() {
let result = LokiSink::new("loki.example.com".to_string(), HashMap::new(), 100, None);
assert!(result.is_err(), "URL without scheme must be rejected");
}
#[test]
fn new_with_empty_url_returns_sink_error() {
let result = LokiSink::new(String::new(), HashMap::new(), 100, None);
assert!(result.is_err(), "empty URL must be rejected");
}
#[test]
fn new_error_message_contains_the_bad_url() {
let bad_url = "not-a-url";
let result = LokiSink::new(bad_url.to_string(), HashMap::new(), 100, None);
let err = result.err().expect("should be Err");
let msg = err.to_string();
assert!(
msg.contains(bad_url),
"error message should contain the bad URL; got: {msg}"
);
}
#[test]
fn flush_produces_valid_loki_push_json_envelope() {
let (listener, url) = mock_loki_listener();
let handle = thread::spawn(move || accept_one_and_respond(&listener, 204));
let mut labels = HashMap::new();
labels.insert("job".to_string(), "sonda".to_string());
let mut sink = LokiSink::new(url, labels, 100, None).expect("construct sink");
sink.write(b"hello loki\n").expect("write");
sink.flush().expect("flush");
let body_bytes = handle.join().expect("mock server thread panicked");
let body = String::from_utf8(body_bytes).expect("valid UTF-8");
let parsed: serde_json::Value =
serde_json::from_str(&body).expect("envelope must be valid JSON");
let streams = parsed.get("streams").expect("must have 'streams' key");
let streams_arr = streams.as_array().expect("'streams' must be an array");
assert_eq!(streams_arr.len(), 1, "exactly one stream expected");
let stream_obj = &streams_arr[0];
assert!(
stream_obj.get("stream").is_some(),
"stream object must have 'stream' key"
);
assert!(
stream_obj.get("values").is_some(),
"stream object must have 'values' key"
);
let values = stream_obj["values"]
.as_array()
.expect("'values' must be array");
assert_eq!(values.len(), 1, "exactly one value expected");
let pair = values[0].as_array().expect("each value must be an array");
assert_eq!(pair.len(), 2, "each value must be [timestamp, log_line]");
let ts = pair[0].as_str().expect("timestamp must be a string");
assert!(!ts.is_empty(), "timestamp must not be empty");
ts.parse::<u128>()
.expect("timestamp must be numeric nanoseconds");
let log_line = pair[1].as_str().expect("log line must be a string");
assert_eq!(log_line, "hello loki", "log line content must match");
}
#[test]
fn labels_appear_in_stream_object_of_push_envelope() {
let (listener, url) = mock_loki_listener();
let handle = thread::spawn(move || accept_one_and_respond(&listener, 204));
let mut labels = HashMap::new();
labels.insert("job".to_string(), "sonda".to_string());
labels.insert("env".to_string(), "dev".to_string());
let mut sink = LokiSink::new(url, labels, 100, None).expect("construct sink");
sink.write(b"test\n").expect("write");
sink.flush().expect("flush");
let body_bytes = handle.join().expect("mock server thread panicked");
let body = String::from_utf8(body_bytes).expect("UTF-8 body");
let parsed: serde_json::Value = serde_json::from_str(&body).expect("valid JSON");
let stream = &parsed["streams"][0]["stream"];
assert_eq!(
stream["job"].as_str(),
Some("sonda"),
"'job' label must be present"
);
assert_eq!(
stream["env"].as_str(),
Some("dev"),
"'env' label must be present"
);
}
#[test]
fn empty_labels_produce_empty_stream_object() {
let (listener, url) = mock_loki_listener();
let handle = thread::spawn(move || accept_one_and_respond(&listener, 204));
let mut sink = LokiSink::new(url, HashMap::new(), 100, None).expect("construct sink");
sink.write(b"line\n").expect("write");
sink.flush().expect("flush");
let body_bytes = handle.join().expect("mock server thread panicked");
let body = String::from_utf8(body_bytes).expect("UTF-8");
let parsed: serde_json::Value = serde_json::from_str(&body).expect("valid JSON");
let stream = &parsed["streams"][0]["stream"];
assert!(
stream.as_object().map(|m| m.is_empty()).unwrap_or(false),
"stream object must be empty when no labels configured"
);
}
#[test]
fn write_below_batch_size_does_not_trigger_http_call() {
let (listener, url) = mock_loki_listener();
let mut sink = LokiSink::new(url, HashMap::new(), 50, None).expect("construct sink");
for i in 0..49 {
sink.write(format!("line {i}\n").as_bytes())
.expect("write should buffer");
}
listener.set_nonblocking(true).expect("set non-blocking");
let accepted = listener.accept();
assert!(
accepted.is_err(),
"no HTTP request should fire before batch_size is reached"
);
}
#[test]
fn write_at_batch_size_triggers_exactly_one_http_call() {
let (listener, url) = mock_loki_listener();
let handle = thread::spawn(move || accept_one_and_respond(&listener, 204));
let mut sink = LokiSink::new(url, HashMap::new(), 50, None).expect("construct sink");
for i in 0..50 {
sink.write(format!("line {i}\n").as_bytes()).expect("write");
}
let body_bytes = handle.join().expect("mock server thread panicked");
let body = String::from_utf8(body_bytes).expect("UTF-8");
let parsed: serde_json::Value = serde_json::from_str(&body).expect("valid JSON");
let values = &parsed["streams"][0]["values"];
assert_eq!(
values.as_array().map(|v| v.len()),
Some(50),
"all 50 lines must be in the flushed batch"
);
}
#[test]
fn explicit_flush_sends_partial_batch() {
let (listener, url) = mock_loki_listener();
let handle = thread::spawn(move || accept_one_and_respond(&listener, 204));
let mut sink = LokiSink::new(url, HashMap::new(), 100, None).expect("construct sink");
sink.write(b"alpha\n").expect("write 1");
sink.write(b"beta\n").expect("write 2");
sink.write(b"gamma\n").expect("write 3");
sink.flush().expect("explicit flush");
let body_bytes = handle.join().expect("mock server thread panicked");
let body = String::from_utf8(body_bytes).expect("UTF-8");
let parsed: serde_json::Value = serde_json::from_str(&body).expect("valid JSON");
let values = parsed["streams"][0]["values"]
.as_array()
.expect("values array");
assert_eq!(values.len(), 3, "all 3 partial lines must be flushed");
}
#[test]
fn flush_is_idempotent() {
let (listener, url) = mock_loki_listener();
let handle = thread::spawn(move || accept_one_and_respond(&listener, 204));
let mut sink = LokiSink::new(url, HashMap::new(), 100, None).expect("construct sink");
sink.write(b"once\n").expect("write");
sink.flush().expect("first flush sends data");
let _body = handle.join().expect("mock server thread panicked");
assert!(sink.flush().is_ok(), "second flush must return Ok");
}
#[test]
fn flush_on_empty_batch_is_a_noop() {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind");
let port = listener.local_addr().expect("addr").port();
drop(listener);
let url = format!("http://127.0.0.1:{port}");
let mut sink = LokiSink::new(url, HashMap::new(), 100, None).expect("construct sink");
assert!(
sink.flush().is_ok(),
"flush on empty batch must return Ok without making a network call"
);
}
#[test]
fn trailing_newline_is_stripped_from_log_lines() {
let (listener, url) = mock_loki_listener();
let handle = thread::spawn(move || accept_one_and_respond(&listener, 204));
let mut sink = LokiSink::new(url, HashMap::new(), 100, None).expect("construct sink");
sink.write(b"my log line\n").expect("write with newline");
sink.flush().expect("flush");
let body_bytes = handle.join().expect("mock server thread panicked");
let body = String::from_utf8(body_bytes).expect("UTF-8");
let parsed: serde_json::Value = serde_json::from_str(&body).expect("valid JSON");
let log_line = parsed["streams"][0]["values"][0][1]
.as_str()
.expect("log line string");
assert_eq!(
log_line, "my log line",
"trailing newline must be stripped from the log line"
);
}
#[test]
fn five_xx_response_returns_sink_error() {
let (listener, url) = mock_loki_listener();
let handle = thread::spawn(move || accept_one_and_respond(&listener, 500));
let mut sink = LokiSink::new(url, HashMap::new(), 100, None).expect("construct sink");
sink.write(b"line\n").expect("write buffered");
let result = sink.flush();
handle.join().expect("mock server thread panicked");
assert!(result.is_err(), "5xx response must return Err");
assert!(
matches!(result.err().unwrap(), SondaError::Sink(_)),
"expected SondaError::Sink"
);
}
#[test]
fn four_xx_response_warns_and_discards_batch_returning_ok() {
let (listener, url) = mock_loki_listener();
let handle = thread::spawn(move || accept_one_and_respond(&listener, 400));
let mut sink = LokiSink::new(url, HashMap::new(), 100, None).expect("construct sink");
sink.write(b"line\n").expect("write buffered");
let result = sink.flush();
handle.join().expect("mock server thread panicked");
assert!(
result.is_ok(),
"4xx response must return Ok (warn-and-continue)"
);
}
#[test]
fn flush_to_refused_port_returns_sink_error() {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind");
let port = listener.local_addr().expect("addr").port();
drop(listener);
let url = format!("http://127.0.0.1:{port}");
let mut sink = LokiSink::new(url, HashMap::new(), 100, None).expect("construct sink");
sink.write(b"line\n").expect("write buffered");
let result = sink.flush();
assert!(result.is_err(), "connection refused must return Err");
assert!(
matches!(result.err().unwrap(), SondaError::Sink(_)),
"expected SondaError::Sink"
);
}
#[test]
fn log_line_with_double_quotes_is_properly_escaped() {
let (listener, url) = mock_loki_listener();
let handle = thread::spawn(move || accept_one_and_respond(&listener, 204));
let mut sink = LokiSink::new(url, HashMap::new(), 100, None).expect("construct sink");
sink.write(b"msg=\"hello world\"").expect("write");
sink.flush().expect("flush");
let body_bytes = handle.join().expect("mock server thread panicked");
let body = String::from_utf8(body_bytes).expect("UTF-8");
let parsed: serde_json::Value =
serde_json::from_str(&body).expect("must parse as valid JSON after escaping");
let log_line = parsed["streams"][0]["values"][0][1]
.as_str()
.expect("log line");
assert_eq!(log_line, r#"msg="hello world""#);
}
#[test]
fn label_value_with_special_characters_is_properly_escaped() {
let (listener, url) = mock_loki_listener();
let handle = thread::spawn(move || accept_one_and_respond(&listener, 204));
let mut labels = HashMap::new();
labels.insert("app".to_string(), r#"my "special" app"#.to_string());
let mut sink = LokiSink::new(url, labels, 100, None).expect("construct sink");
sink.write(b"line\n").expect("write");
sink.flush().expect("flush");
let body_bytes = handle.join().expect("mock server thread panicked");
let body = String::from_utf8(body_bytes).expect("UTF-8");
let parsed: serde_json::Value =
serde_json::from_str(&body).expect("envelope with escaped labels must be valid JSON");
let app_label = parsed["streams"][0]["stream"]["app"]
.as_str()
.expect("app label");
assert_eq!(app_label, r#"my "special" app"#);
}
#[test]
fn batch_is_cleared_after_auto_flush() {
let (listener, url) = mock_loki_listener();
let handle = thread::spawn(move || {
let first = accept_one_and_respond(&listener, 204);
let second = accept_one_and_respond(&listener, 204);
(first, second)
});
let mut sink = LokiSink::new(url, HashMap::new(), 2, None).expect("construct sink");
sink.write(b"line 0\n").expect("write 0");
sink.write(b"line 1\n").expect("write 1");
sink.write(b"line 2\n").expect("write 2");
sink.write(b"line 3\n").expect("write 3");
let (first_body, second_body) = handle.join().expect("mock server thread panicked");
let p1: serde_json::Value =
serde_json::from_str(&String::from_utf8(first_body).expect("UTF-8"))
.expect("first batch JSON");
let p2: serde_json::Value =
serde_json::from_str(&String::from_utf8(second_body).expect("UTF-8"))
.expect("second batch JSON");
assert_eq!(
p1["streams"][0]["values"].as_array().map(|v| v.len()),
Some(2),
"first batch must contain exactly 2 entries"
);
assert_eq!(
p2["streams"][0]["values"].as_array().map(|v| v.len()),
Some(2),
"second batch must contain exactly 2 entries"
);
}
#[cfg(feature = "config")]
#[test]
fn sink_config_loki_deserializes_with_url_only() {
let yaml = "type: loki\nurl: \"http://localhost:3100\"";
let config: SinkConfig = serde_yaml_ng::from_str(yaml).expect("should deserialize");
match config {
SinkConfig::Loki {
ref url,
batch_size,
..
} => {
assert_eq!(url, "http://localhost:3100");
assert!(batch_size.is_none(), "batch_size should default to None");
}
other => panic!("expected Loki variant, got {other:?}"),
}
}
#[cfg(feature = "config")]
#[test]
fn sink_config_loki_deserializes_with_batch_size() {
let yaml = r#"
type: loki
url: "http://localhost:3100"
batch_size: 50
"#;
let config: SinkConfig = serde_yaml_ng::from_str(yaml).expect("should deserialize");
match config {
SinkConfig::Loki {
ref url,
batch_size,
..
} => {
assert_eq!(url, "http://localhost:3100");
assert_eq!(batch_size, Some(50));
}
other => panic!("expected Loki variant, got {other:?}"),
}
}
#[cfg(feature = "config")]
#[test]
fn sink_config_loki_requires_url_field() {
let yaml = "type: loki";
let result: Result<SinkConfig, _> = serde_yaml_ng::from_str(yaml);
assert!(
result.is_err(),
"loki variant without url must fail deserialization"
);
}
#[test]
fn create_sink_loki_with_valid_url_returns_ok() {
let config = SinkConfig::Loki {
url: "http://localhost:3100".to_string(),
batch_size: None,
retry: None,
};
assert!(
create_sink(&config, None).is_ok(),
"factory must return Ok for valid loki config"
);
}
#[test]
fn create_sink_loki_with_labels_passes_them_to_sink() {
let (listener, url) = mock_loki_listener();
let handle = thread::spawn(move || accept_one_and_respond(&listener, 204));
let config = SinkConfig::Loki {
url,
batch_size: None,
retry: None,
};
let mut labels = HashMap::new();
labels.insert("job".to_string(), "sonda".to_string());
let mut sink = create_sink(&config, Some(&labels)).expect("factory ok");
sink.write(b"test\n").expect("write");
sink.flush().expect("flush");
let body_bytes = handle.join().expect("mock server thread panicked");
let body = String::from_utf8(body_bytes).expect("UTF-8");
let parsed: serde_json::Value = serde_json::from_str(&body).expect("valid JSON");
assert_eq!(
parsed["streams"][0]["stream"]["job"].as_str(),
Some("sonda"),
"labels passed to create_sink must appear in Loki stream"
);
}
#[test]
fn create_sink_loki_with_none_labels_uses_empty_labels() {
let (listener, url) = mock_loki_listener();
let handle = thread::spawn(move || accept_one_and_respond(&listener, 204));
let config = SinkConfig::Loki {
url,
batch_size: None,
retry: None,
};
let mut sink = create_sink(&config, None).expect("factory ok");
sink.write(b"test\n").expect("write");
sink.flush().expect("flush");
let body_bytes = handle.join().expect("mock server thread panicked");
let body = String::from_utf8(body_bytes).expect("UTF-8");
let parsed: serde_json::Value = serde_json::from_str(&body).expect("valid JSON");
let stream = &parsed["streams"][0]["stream"];
assert!(
stream.as_object().map(|m| m.is_empty()).unwrap_or(false),
"None labels must produce empty stream object"
);
}
#[test]
fn default_batch_size_is_5() {
assert_eq!(DEFAULT_BATCH_SIZE, 5);
}
#[test]
fn create_sink_loki_with_no_batch_size_uses_default() {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind");
let port = listener.local_addr().expect("addr").port();
drop(listener);
let url = format!("http://127.0.0.1:{port}");
let config = SinkConfig::Loki {
url,
batch_size: None,
retry: None,
};
let mut sink = create_sink(&config, None).expect("factory ok");
for i in 0..(DEFAULT_BATCH_SIZE - 1) as u32 {
sink.write(format!("line {i}\n").as_bytes())
.expect("write must succeed below batch_size");
}
}
#[test]
fn create_sink_loki_with_invalid_url_returns_err() {
let config = SinkConfig::Loki {
url: "not-http://bad".to_string(),
batch_size: None,
retry: None,
};
let result = create_sink(&config, None);
assert!(result.is_err(), "invalid URL must cause factory to fail");
}
#[test]
fn loki_sink_is_send_and_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<LokiSink>();
}
#[cfg(feature = "config")]
#[test]
fn loki_json_lines_example_yaml_deserializes_to_log_scenario_config() {
use crate::config::LogScenarioConfig;
let yaml = r#"
name: app_logs_loki
rate: 10
duration: 60s
generator:
type: template
templates:
- message: "Request from {ip} to {endpoint}"
field_pools:
ip: ["10.0.0.1", "10.0.0.2", "10.0.0.3"]
endpoint: ["/api/v1/health", "/api/v1/metrics", "/api/v1/logs"]
severity_weights:
info: 0.7
warn: 0.2
error: 0.1
labels:
job: sonda
env: dev
encoder:
type: json_lines
sink:
type: loki
url: http://localhost:3100
batch_size: 50
"#;
let config: LogScenarioConfig =
serde_yaml_ng::from_str(yaml).expect("loki-json-lines.yaml must deserialize correctly");
assert_eq!(config.name, "app_logs_loki");
assert!((config.rate - 10.0).abs() < f64::EPSILON);
let labels = config.labels.as_ref().expect("labels must be present");
assert_eq!(labels.get("job").map(String::as_str), Some("sonda"));
assert_eq!(labels.get("env").map(String::as_str), Some("dev"));
match &config.sink {
SinkConfig::Loki {
url, batch_size, ..
} => {
assert_eq!(url, "http://localhost:3100");
assert_eq!(batch_size, &Some(50));
}
other => panic!("expected Loki sink, got {other:?}"),
}
}
}
fn escape_json(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for c in s.chars() {
match c {
'\\' => out.push_str("\\\\"),
'"' => out.push_str("\\\""),
'\n' => out.push_str("\\n"),
'\r' => out.push_str("\\r"),
'\t' => out.push_str("\\t"),
c if (c as u32) < 32 => {
out.push_str(&format!("\\u{:04x}", c as u32));
}
c => out.push(c),
}
}
out
}