use std::marker::PhantomData;
use bytes::Buf;
use prost::Message;
use tokio::runtime::Runtime;
use tonic::client::Grpc;
use tonic::codec::{Codec, Decoder, Encoder as TonicEncoder};
use tonic::transport::Channel;
use tonic::Status;
use crate::encoder::otlp::{
self, ExportLogsServiceRequest, ExportLogsServiceResponse, ExportMetricsServiceRequest,
ExportMetricsServiceResponse, InstrumentationScope, KeyValue, LogRecord, Metric, Resource,
ResourceLogs, ResourceMetrics, ScopeLogs, ScopeMetrics,
};
use crate::sink::retry::RetryPolicy;
use crate::sink::Sink;
use crate::SondaError;
pub const DEFAULT_BATCH_SIZE: usize = 5;
const METRICS_EXPORT_PATH: &str = "/opentelemetry.proto.collector.metrics.v1.MetricsService/Export";
const LOGS_EXPORT_PATH: &str = "/opentelemetry.proto.collector.logs.v1.LogsService/Export";
#[derive(Debug, Clone)]
struct OtlpCodec<T, U>(PhantomData<(T, U)>);
impl<T, U> Default for OtlpCodec<T, U> {
fn default() -> Self {
Self(PhantomData)
}
}
impl<T, U> Codec for OtlpCodec<T, U>
where
T: Message + 'static,
U: Message + Default + 'static,
{
type Encode = T;
type Decode = U;
type Encoder = OtlpProstEncoder<T>;
type Decoder = OtlpProstDecoder<U>;
fn encoder(&mut self) -> Self::Encoder {
OtlpProstEncoder(PhantomData)
}
fn decoder(&mut self) -> Self::Decoder {
OtlpProstDecoder(PhantomData)
}
}
#[derive(Debug)]
struct OtlpProstEncoder<T>(PhantomData<T>);
impl<T: Message + 'static> TonicEncoder for OtlpProstEncoder<T> {
type Item = T;
type Error = Status;
fn encode(
&mut self,
item: Self::Item,
dst: &mut tonic::codec::EncodeBuf<'_>,
) -> Result<(), Self::Error> {
item.encode(dst)
.map_err(|e| Status::internal(format!("protobuf encode error: {e}")))
}
}
#[derive(Debug)]
struct OtlpProstDecoder<T>(PhantomData<T>);
impl<T: Message + Default + 'static> Decoder for OtlpProstDecoder<T> {
type Item = T;
type Error = Status;
fn decode(
&mut self,
src: &mut tonic::codec::DecodeBuf<'_>,
) -> Result<Option<Self::Item>, Self::Error> {
let buf = src.copy_to_bytes(src.remaining());
if buf.is_empty() {
return Ok(None);
}
T::decode(buf)
.map(Some)
.map_err(|e| Status::internal(format!("protobuf decode error: {e}")))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "config", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "config", serde(rename_all = "lowercase"))]
pub enum OtlpSignalType {
Metrics,
Logs,
}
pub struct OtlpGrpcSink {
runtime: Runtime,
channel: Channel,
metric_batch: Vec<Metric>,
log_batch: Vec<LogRecord>,
batch_size: usize,
signal_type: OtlpSignalType,
resource_attrs: Vec<KeyValue>,
endpoint: String,
retry_policy: Option<RetryPolicy>,
}
impl OtlpGrpcSink {
pub fn new(
endpoint: &str,
signal_type: OtlpSignalType,
batch_size: usize,
resource_attrs: Vec<KeyValue>,
retry_policy: Option<RetryPolicy>,
) -> Result<Self, SondaError> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| {
std::io::Error::other(format!(
"otlp grpc sink: failed to build tokio runtime for '{}': {}",
endpoint, e
))
})
.map_err(SondaError::Sink)?;
let endpoint_str = endpoint.to_owned();
let channel = runtime
.block_on(async {
Channel::from_shared(endpoint_str.clone())
.map_err(|e| {
std::io::Error::other(format!(
"otlp grpc sink: invalid endpoint '{}': {}",
endpoint_str, e
))
})?
.connect()
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::ConnectionRefused,
format!(
"otlp grpc sink: failed to connect to '{}': {}",
endpoint_str, e
),
)
})
})
.map_err(SondaError::Sink)?;
Ok(Self {
runtime,
channel,
metric_batch: Vec::with_capacity(batch_size),
log_batch: Vec::with_capacity(batch_size),
batch_size,
signal_type,
resource_attrs,
endpoint: endpoint.to_owned(),
retry_policy,
})
}
fn build_resource(&self) -> Resource {
Resource {
attributes: self.resource_attrs.clone(),
}
}
fn build_scope() -> InstrumentationScope {
InstrumentationScope {
name: "sonda".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
}
}
fn flush_metrics(&mut self) -> Result<(), SondaError> {
if self.metric_batch.is_empty() {
return Ok(());
}
let metrics =
std::mem::replace(&mut self.metric_batch, Vec::with_capacity(self.batch_size));
match &self.retry_policy {
Some(policy) => {
let policy = policy.clone();
policy.execute(
|| {
let request = ExportMetricsServiceRequest {
resource_metrics: vec![ResourceMetrics {
resource: Some(self.build_resource()),
scope_metrics: vec![ScopeMetrics {
scope: Some(Self::build_scope()),
metrics: metrics.clone(),
}],
}],
};
self.send_grpc_unary::<
ExportMetricsServiceRequest,
ExportMetricsServiceResponse,
>(request, METRICS_EXPORT_PATH)
},
Self::is_retryable,
)
}
None => {
let request = ExportMetricsServiceRequest {
resource_metrics: vec![ResourceMetrics {
resource: Some(self.build_resource()),
scope_metrics: vec![ScopeMetrics {
scope: Some(Self::build_scope()),
metrics,
}],
}],
};
self.send_grpc_unary::<ExportMetricsServiceRequest, ExportMetricsServiceResponse>(
request,
METRICS_EXPORT_PATH,
)
}
}
}
fn flush_logs(&mut self) -> Result<(), SondaError> {
if self.log_batch.is_empty() {
return Ok(());
}
let log_records =
std::mem::replace(&mut self.log_batch, Vec::with_capacity(self.batch_size));
match &self.retry_policy {
Some(policy) => {
let policy = policy.clone();
policy.execute(
|| {
let request = ExportLogsServiceRequest {
resource_logs: vec![ResourceLogs {
resource: Some(self.build_resource()),
scope_logs: vec![ScopeLogs {
scope: Some(Self::build_scope()),
log_records: log_records.clone(),
}],
}],
};
self.send_grpc_unary::<ExportLogsServiceRequest, ExportLogsServiceResponse>(
request,
LOGS_EXPORT_PATH,
)
},
Self::is_retryable,
)
}
None => {
let request = ExportLogsServiceRequest {
resource_logs: vec![ResourceLogs {
resource: Some(self.build_resource()),
scope_logs: vec![ScopeLogs {
scope: Some(Self::build_scope()),
log_records,
}],
}],
};
self.send_grpc_unary::<ExportLogsServiceRequest, ExportLogsServiceResponse>(
request,
LOGS_EXPORT_PATH,
)
}
}
}
fn is_retryable(err: &SondaError) -> bool {
if let SondaError::Sink(io_err) = err {
let msg = io_err.to_string();
if msg.contains("UNAVAILABLE")
|| msg.contains("DEADLINE_EXCEEDED")
|| msg.contains("RESOURCE_EXHAUSTED")
|| io_err.kind() == std::io::ErrorKind::ConnectionRefused
|| io_err.kind() == std::io::ErrorKind::BrokenPipe
{
return true;
}
if msg.contains("not ready") || msg.contains("connection") {
return true;
}
}
false
}
fn send_grpc_unary<T, U>(&mut self, request: T, path: &'static str) -> Result<(), SondaError>
where
T: Message + 'static,
U: Message + Default + 'static,
{
let channel = self.channel.clone();
let endpoint = self.endpoint.clone();
let result = self.runtime.block_on(async {
let mut client = Grpc::new(channel);
client.ready().await.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::ConnectionRefused,
format!("otlp grpc sink: service not ready at '{}': {}", endpoint, e),
)
})?;
let grpc_path = http::uri::PathAndQuery::from_static(path);
let codec: OtlpCodec<T, U> = OtlpCodec::default();
let tonic_request = tonic::Request::new(request);
client
.unary(tonic_request, grpc_path, codec)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
format!("otlp grpc sink: gRPC call to '{}' failed: {}", endpoint, e),
)
})?;
Ok::<(), std::io::Error>(())
});
result.map_err(SondaError::Sink)
}
}
impl Sink for OtlpGrpcSink {
fn write(&mut self, data: &[u8]) -> Result<(), SondaError> {
match self.signal_type {
OtlpSignalType::Metrics => {
let metrics = otlp::parse_length_prefixed_metrics(data)?;
self.metric_batch.extend(metrics);
if self.metric_batch.len() >= self.batch_size {
self.flush_metrics()?;
}
}
OtlpSignalType::Logs => {
let records = otlp::parse_length_prefixed_log_records(data)?;
self.log_batch.extend(records);
if self.log_batch.len() >= self.batch_size {
self.flush_logs()?;
}
}
}
Ok(())
}
fn flush(&mut self) -> Result<(), SondaError> {
match self.signal_type {
OtlpSignalType::Metrics => self.flush_metrics(),
OtlpSignalType::Logs => self.flush_logs(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::encoder::otlp::{
self, any_value, metric, number_data_point, AnyValue, Gauge, Metric, NumberDataPoint,
};
use crate::sink::SinkConfig;
#[test]
fn default_batch_size_is_5() {
assert_eq!(DEFAULT_BATCH_SIZE, 5);
}
#[test]
fn signal_type_is_cloneable_and_debuggable() {
let st = OtlpSignalType::Metrics;
let cloned = st;
assert_eq!(cloned, OtlpSignalType::Metrics);
let s = format!("{st:?}");
assert!(s.contains("Metrics"));
}
#[cfg(feature = "config")]
#[test]
fn signal_type_deserializes_metrics() {
let json = "\"metrics\"";
let st: OtlpSignalType = serde_json::from_str(json).expect("deser ok");
assert_eq!(st, OtlpSignalType::Metrics);
}
#[cfg(feature = "config")]
#[test]
fn signal_type_deserializes_logs() {
let json = "\"logs\"";
let st: OtlpSignalType = serde_json::from_str(json).expect("deser ok");
assert_eq!(st, OtlpSignalType::Logs);
}
#[test]
fn otlp_grpc_sink_is_send_and_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<OtlpGrpcSink>();
}
#[cfg(feature = "config")]
#[test]
fn sink_config_otlp_grpc_deserializes_with_all_fields() {
let yaml = r#"
type: otlp_grpc
endpoint: "http://localhost:4317"
signal_type: metrics
batch_size: 50
"#;
let config: SinkConfig = serde_yaml_ng::from_str(yaml).expect("deser ok");
match config {
SinkConfig::OtlpGrpc {
endpoint,
signal_type,
batch_size,
..
} => {
assert_eq!(endpoint, "http://localhost:4317");
assert_eq!(signal_type, OtlpSignalType::Metrics);
assert_eq!(batch_size, Some(50));
}
other => panic!("expected OtlpGrpc, got {other:?}"),
}
}
#[cfg(feature = "config")]
#[test]
fn sink_config_otlp_grpc_batch_size_is_optional() {
let yaml = r#"
type: otlp_grpc
endpoint: "http://localhost:4317"
signal_type: logs
"#;
let config: SinkConfig = serde_yaml_ng::from_str(yaml).expect("deser ok");
match config {
SinkConfig::OtlpGrpc {
batch_size,
signal_type,
..
} => {
assert!(batch_size.is_none());
assert_eq!(signal_type, OtlpSignalType::Logs);
}
other => panic!("expected OtlpGrpc, got {other:?}"),
}
}
#[cfg(feature = "config")]
#[test]
fn sink_config_otlp_grpc_requires_endpoint() {
let yaml = "type: otlp_grpc\nsignal_type: metrics";
let result: Result<SinkConfig, _> = serde_yaml_ng::from_str(yaml);
assert!(
result.is_err(),
"otlp_grpc without endpoint should fail deserialization"
);
}
#[cfg(feature = "config")]
#[test]
fn sink_config_otlp_grpc_requires_signal_type() {
let yaml = "type: otlp_grpc\nendpoint: \"http://localhost:4317\"";
let result: Result<SinkConfig, _> = serde_yaml_ng::from_str(yaml);
assert!(
result.is_err(),
"otlp_grpc without signal_type should fail deserialization"
);
}
#[test]
fn sink_config_otlp_grpc_is_cloneable_and_debuggable() {
let config = SinkConfig::OtlpGrpc {
endpoint: "http://localhost:4317".to_string(),
signal_type: OtlpSignalType::Metrics,
batch_size: Some(100),
retry: None,
};
let cloned = config.clone();
let s = format!("{cloned:?}");
assert!(s.contains("OtlpGrpc"));
assert!(s.contains("4317"));
}
#[test]
fn export_metrics_request_wraps_metrics_correctly() {
let metric = Metric {
name: "test_gauge".to_string(),
description: String::new(),
unit: String::new(),
data: Some(metric::Data::Gauge(Gauge {
data_points: vec![NumberDataPoint {
attributes: vec![],
time_unix_nano: 1_700_000_000_000_000_000,
value: Some(number_data_point::Value::AsDouble(42.0)),
}],
})),
};
let attrs = vec![otlp::KeyValue {
key: "service.name".to_string(),
value: Some(AnyValue {
value: Some(any_value::Value::StringValue("sonda".to_string())),
}),
}];
let req = ExportMetricsServiceRequest {
resource_metrics: vec![ResourceMetrics {
resource: Some(Resource { attributes: attrs }),
scope_metrics: vec![ScopeMetrics {
scope: Some(InstrumentationScope {
name: "sonda".to_string(),
version: "test".to_string(),
}),
metrics: vec![metric],
}],
}],
};
let mut buf = Vec::new();
req.encode(&mut buf).expect("encode");
let decoded = ExportMetricsServiceRequest::decode(buf.as_slice()).expect("decode");
assert_eq!(decoded.resource_metrics.len(), 1);
assert_eq!(
decoded.resource_metrics[0].scope_metrics[0].metrics.len(),
1
);
assert_eq!(
decoded.resource_metrics[0].scope_metrics[0].metrics[0].name,
"test_gauge"
);
}
#[test]
fn export_logs_request_wraps_log_records_correctly() {
let record = otlp::LogRecord {
time_unix_nano: 1_700_000_000_000_000_000,
severity_number: 9,
severity_text: "INFO".to_string(),
body: Some(AnyValue {
value: Some(any_value::Value::StringValue("hello".to_string())),
}),
attributes: vec![],
};
let req = ExportLogsServiceRequest {
resource_logs: vec![ResourceLogs {
resource: Some(Resource { attributes: vec![] }),
scope_logs: vec![ScopeLogs {
scope: Some(InstrumentationScope {
name: "sonda".to_string(),
version: "test".to_string(),
}),
log_records: vec![record],
}],
}],
};
let mut buf = Vec::new();
req.encode(&mut buf).expect("encode");
let decoded = ExportLogsServiceRequest::decode(buf.as_slice()).expect("decode");
assert_eq!(decoded.resource_logs.len(), 1);
assert_eq!(decoded.resource_logs[0].scope_logs[0].log_records.len(), 1);
assert_eq!(
decoded.resource_logs[0].scope_logs[0].log_records[0].severity_text,
"INFO"
);
}
#[test]
#[ignore = "requires network timeout; run with --ignored when desired"]
fn new_with_unreachable_endpoint_returns_sink_error() {
let result = OtlpGrpcSink::new(
"http://127.0.0.1:1",
OtlpSignalType::Metrics,
DEFAULT_BATCH_SIZE,
vec![],
None,
);
match result {
Err(err) => {
let msg = err.to_string();
assert!(
msg.contains("127.0.0.1:1") || msg.contains("otlp"),
"error should reference the endpoint: {msg}"
);
}
Ok(_) => panic!("construction must fail when endpoint is unreachable"),
}
}
#[test]
fn new_with_invalid_endpoint_returns_error() {
let result = OtlpGrpcSink::new(
"not a url",
OtlpSignalType::Metrics,
DEFAULT_BATCH_SIZE,
vec![],
None,
);
assert!(result.is_err(), "invalid endpoint URL must be rejected");
}
#[cfg(feature = "config")]
#[test]
fn scenario_yaml_with_otlp_metrics_deserializes() {
use crate::config::ScenarioConfig;
use crate::encoder::EncoderConfig;
let yaml = r#"
name: otlp_test
rate: 10.0
generator:
type: constant
value: 1.0
encoder:
type: otlp
sink:
type: otlp_grpc
endpoint: "http://localhost:4317"
signal_type: metrics
"#;
let config: ScenarioConfig = serde_yaml_ng::from_str(yaml).expect("deser ok");
assert_eq!(config.name, "otlp_test");
assert!(matches!(config.encoder, EncoderConfig::Otlp));
assert!(matches!(
config.sink,
SinkConfig::OtlpGrpc {
ref endpoint,
signal_type: OtlpSignalType::Metrics,
..
} if endpoint == "http://localhost:4317"
));
}
#[cfg(feature = "config")]
#[test]
fn scenario_yaml_with_otlp_logs_deserializes() {
use crate::config::LogScenarioConfig;
use crate::encoder::EncoderConfig;
let yaml = r#"
name: otlp_logs_test
rate: 5.0
generator:
type: template
templates:
- message: "Request processed"
encoder:
type: otlp
sink:
type: otlp_grpc
endpoint: "http://localhost:4317"
signal_type: logs
batch_size: 50
"#;
let config: LogScenarioConfig = serde_yaml_ng::from_str(yaml).expect("deser ok");
assert_eq!(config.name, "otlp_logs_test");
assert!(matches!(config.encoder, EncoderConfig::Otlp));
match &config.sink {
SinkConfig::OtlpGrpc {
endpoint,
signal_type,
batch_size,
..
} => {
assert_eq!(endpoint, "http://localhost:4317");
assert_eq!(*signal_type, OtlpSignalType::Logs);
assert_eq!(*batch_size, Some(50));
}
other => panic!("expected OtlpGrpc, got {other:?}"),
}
}
}