use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
use opentelemetry::InstrumentationScope;
use opentelemetry::{KeyValue, global};
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::error::OTelSdkResult;
use opentelemetry_sdk::logs::{LogProcessor, SdkLogRecord};
use crate::error::ExporterKind;
use crate::processors::RateLimiter;
pub(crate) struct RateLimitedLogProcessor<P> {
inner: P,
limiter: Arc<RateLimiter>,
}
impl<P: Debug> Debug for RateLimitedLogProcessor<P> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RateLimitedLogProcessor")
.field("inner", &self.inner)
.field("max_rate", &self.limiter.max_rate())
.finish()
}
}
impl<P> RateLimitedLogProcessor<P> {
pub(crate) fn new(inner: P, max_rate: u32, exporter: ExporterKind, instance_id: u64) -> Self {
let meter = global::meter("apollo-opentelemetry");
let counter = meter
.u64_counter("otel.sdk.processor.log.processed")
.with_description("The number of log records for which the processing has finished")
.with_unit("{log_record}")
.build();
let component_name = format!("rate_limited_log_processor/{}", instance_id);
let attributes = vec![
KeyValue::new("otel.component.type", "rate_limited_log_processor"),
KeyValue::new("otel.component.name", component_name),
];
Self {
inner,
limiter: Arc::new(RateLimiter::new(max_rate, counter, attributes, exporter)),
}
}
}
impl<P: LogProcessor> LogProcessor for RateLimitedLogProcessor<P> {
fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
if self.limiter.check() {
self.inner.emit(record, instrumentation);
}
}
fn force_flush(&self) -> OTelSdkResult {
self.inner.force_flush()
}
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
self.inner.shutdown_with_timeout(timeout)
}
fn set_resource(&mut self, resource: &Resource) {
self.inner.set_resource(resource);
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use opentelemetry::logs::{Logger, LoggerProvider};
use opentelemetry_sdk::logs::SdkLoggerProvider;
use super::*;
#[derive(Debug, Default)]
struct MockLogProcessor {
emit_count: AtomicUsize,
}
impl LogProcessor for MockLogProcessor {
fn emit(&self, _record: &mut SdkLogRecord, _instrumentation: &InstrumentationScope) {
self.emit_count.fetch_add(1, Ordering::SeqCst);
}
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}
fn set_resource(&mut self, _resource: &Resource) {}
}
#[derive(Debug, Clone, Default)]
struct SharedMockLogProcessor {
inner: Arc<MockLogProcessor>,
}
impl LogProcessor for SharedMockLogProcessor {
fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
self.inner.emit(record, instrumentation);
}
fn force_flush(&self) -> OTelSdkResult {
self.inner.force_flush()
}
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
self.inner.shutdown_with_timeout(timeout)
}
fn set_resource(&mut self, _resource: &Resource) {}
}
fn create_test_scope() -> InstrumentationScope {
InstrumentationScope::builder("test").build()
}
#[test]
fn logs_within_limit_are_forwarded() {
let mock = SharedMockLogProcessor::default();
let processor = RateLimitedLogProcessor::new(mock.clone(), 100, ExporterKind::OtlpHttp, 0);
let provider = SdkLoggerProvider::builder().build();
let logger = provider.logger("test");
let scope = create_test_scope();
for _ in 0..50 {
let mut record = logger.create_log_record();
processor.emit(&mut record, &scope);
}
assert_eq!(mock.inner.emit_count.load(Ordering::SeqCst), 50);
}
#[test]
fn logs_over_limit_are_dropped() {
let mock = SharedMockLogProcessor::default();
let processor = RateLimitedLogProcessor::new(mock.clone(), 1, ExporterKind::OtlpHttp, 0);
let provider = SdkLoggerProvider::builder().build();
let logger = provider.logger("test");
let scope = create_test_scope();
for _ in 0..10 {
let mut record = logger.create_log_record();
processor.emit(&mut record, &scope);
}
assert_eq!(mock.inner.emit_count.load(Ordering::SeqCst), 1);
}
#[test]
fn force_flush_delegates_to_inner() {
let mock = SharedMockLogProcessor::default();
let processor = RateLimitedLogProcessor::new(mock, 100, ExporterKind::OtlpHttp, 0);
let result = processor.force_flush();
assert!(result.is_ok());
}
#[test]
fn shutdown_with_timeout_delegates_to_inner() {
let mock = SharedMockLogProcessor::default();
let processor = RateLimitedLogProcessor::new(mock, 100, ExporterKind::OtlpHttp, 0);
let result = processor.shutdown_with_timeout(Duration::from_secs(5));
assert!(result.is_ok());
}
#[test]
fn set_resource_delegates_to_inner() {
let mock = SharedMockLogProcessor::default();
let mut processor = RateLimitedLogProcessor::new(mock, 100, ExporterKind::OtlpHttp, 0);
let resource = Resource::builder().build();
processor.set_resource(&resource);
}
#[test]
fn debug_format_shows_inner_and_max_rate() {
let mock = SharedMockLogProcessor::default();
let processor = RateLimitedLogProcessor::new(mock, 100, ExporterKind::OtlpHttp, 0);
let debug_str = format!("{:?}", processor);
assert!(debug_str.contains("RateLimitedLogProcessor"));
assert!(debug_str.contains("max_rate"));
assert!(debug_str.contains("100"));
}
}