opentelemetry_lambda_extension/
aggregator.rs

1//! Signal aggregation and batching.
2//!
3//! This module provides queue-based batching for OTLP signals before export.
4//! Each signal type (traces, metrics, logs) has its own queue with size constraints.
5
6use crate::config::FlushConfig;
7use crate::receiver::Signal;
8use opentelemetry_proto::tonic::collector::{
9    logs::v1::ExportLogsServiceRequest, metrics::v1::ExportMetricsServiceRequest,
10    trace::v1::ExportTraceServiceRequest,
11};
12use prost::Message;
13use std::collections::VecDeque;
14use std::sync::Arc;
15use tokio::sync::{Mutex, Notify, mpsc};
16
17/// Batched signals ready for export.
18#[non_exhaustive]
19#[derive(Debug, Clone)]
20pub enum BatchedSignal {
21    /// Batched trace spans.
22    Traces(ExportTraceServiceRequest),
23    /// Batched metrics.
24    Metrics(ExportMetricsServiceRequest),
25    /// Batched log records.
26    Logs(ExportLogsServiceRequest),
27}
28
29impl BatchedSignal {
30    /// Returns the approximate size of this batch in bytes.
31    pub fn size_bytes(&self) -> usize {
32        match self {
33            BatchedSignal::Traces(req) => req.encoded_len(),
34            BatchedSignal::Metrics(req) => req.encoded_len(),
35            BatchedSignal::Logs(req) => req.encoded_len(),
36        }
37    }
38}
39
40/// Default maximum queue entries (items pending export).
41const DEFAULT_MAX_QUEUE_ENTRIES: usize = 10_000;
42/// Default maximum queue size in bytes.
43const DEFAULT_MAX_QUEUE_BYTES: usize = 64 * 1024 * 1024; // 64 MB
44
45/// Queue for batching a single signal type.
46struct SignalQueue<T> {
47    items: VecDeque<T>,
48    max_batch_bytes: usize,
49    max_batch_entries: usize,
50    max_queue_entries: usize,
51    max_queue_bytes: usize,
52    current_bytes: usize,
53    dropped_count: u64,
54}
55
56impl<T: Message + Default + Clone> SignalQueue<T> {
57    fn new(config: &FlushConfig) -> Self {
58        Self {
59            items: VecDeque::new(),
60            max_batch_bytes: config.max_batch_bytes,
61            max_batch_entries: config.max_batch_entries,
62            max_queue_entries: DEFAULT_MAX_QUEUE_ENTRIES,
63            max_queue_bytes: DEFAULT_MAX_QUEUE_BYTES,
64            current_bytes: 0,
65            dropped_count: 0,
66        }
67    }
68
69    fn push(&mut self, item: T) {
70        let item_size = item.encoded_len();
71
72        // Drop oldest items if queue is full
73        while !self.items.is_empty()
74            && (self.items.len() >= self.max_queue_entries
75                || self.current_bytes + item_size > self.max_queue_bytes)
76        {
77            if let Some(dropped) = self.items.pop_front() {
78                self.current_bytes = self.current_bytes.saturating_sub(dropped.encoded_len());
79                self.dropped_count += 1;
80            }
81        }
82
83        self.current_bytes += item_size;
84        self.items.push_back(item);
85    }
86
87    fn dropped_count(&self) -> u64 {
88        self.dropped_count
89    }
90
91    fn len(&self) -> usize {
92        self.items.len()
93    }
94
95    fn is_empty(&self) -> bool {
96        self.items.is_empty()
97    }
98
99    fn get_batch(&mut self) -> Vec<T> {
100        let mut batch = Vec::new();
101        let mut batch_size = 0;
102
103        while let Some(item) = self.items.pop_front() {
104            let item_size = item.encoded_len();
105
106            if !batch.is_empty()
107                && (batch_size + item_size > self.max_batch_bytes
108                    || batch.len() >= self.max_batch_entries)
109            {
110                // Put item back and re-add its size to queue tracking
111                self.items.push_front(item);
112                break;
113            }
114
115            self.current_bytes = self.current_bytes.saturating_sub(item_size);
116            batch.push(item);
117            batch_size += item_size;
118        }
119
120        batch
121    }
122
123    fn drain_all(&mut self) -> Vec<T> {
124        self.current_bytes = 0;
125        self.items.drain(..).collect()
126    }
127}
128
129/// Aggregator for batching OTLP signals.
130///
131/// Receives signals from the OTLP receiver and batches them for efficient export.
132/// Supports separate queues for traces, metrics, and logs with configurable limits.
133pub struct SignalAggregator {
134    traces: Arc<Mutex<SignalQueue<ExportTraceServiceRequest>>>,
135    metrics: Arc<Mutex<SignalQueue<ExportMetricsServiceRequest>>>,
136    logs: Arc<Mutex<SignalQueue<ExportLogsServiceRequest>>>,
137    notify: Arc<Notify>,
138    #[allow(dead_code)]
139    config: FlushConfig,
140}
141
142impl SignalAggregator {
143    /// Creates a new signal aggregator with the given configuration.
144    pub fn new(config: FlushConfig) -> Self {
145        Self {
146            traces: Arc::new(Mutex::new(SignalQueue::new(&config))),
147            metrics: Arc::new(Mutex::new(SignalQueue::new(&config))),
148            logs: Arc::new(Mutex::new(SignalQueue::new(&config))),
149            notify: Arc::new(Notify::new()),
150            config,
151        }
152    }
153
154    /// Creates a new aggregator with default configuration.
155    pub fn with_defaults() -> Self {
156        Self::new(FlushConfig::default())
157    }
158
159    /// Adds a signal to the appropriate queue.
160    pub async fn add(&self, signal: Signal) {
161        match signal {
162            Signal::Traces(req) => {
163                let mut queue = self.traces.lock().await;
164                queue.push(req);
165            }
166            Signal::Metrics(req) => {
167                let mut queue = self.metrics.lock().await;
168                queue.push(req);
169            }
170            Signal::Logs(req) => {
171                let mut queue = self.logs.lock().await;
172                queue.push(req);
173            }
174        }
175        self.notify.notify_one();
176    }
177
178    /// Runs the aggregator, receiving signals from a channel.
179    ///
180    /// This method processes incoming signals until the channel is closed.
181    pub async fn run(&self, mut signal_rx: mpsc::Receiver<Signal>) {
182        while let Some(signal) = signal_rx.recv().await {
183            self.add(signal).await;
184        }
185        tracing::debug!("Signal aggregator channel closed");
186    }
187
188    /// Gets the next batch of traces for export.
189    ///
190    /// Returns `None` if the trace queue is empty.
191    pub async fn get_trace_batch(&self) -> Option<BatchedSignal> {
192        let mut queue = self.traces.lock().await;
193        let batch = queue.get_batch();
194
195        if batch.is_empty() {
196            return None;
197        }
198
199        let merged = merge_trace_requests(batch);
200        Some(BatchedSignal::Traces(merged))
201    }
202
203    /// Gets the next batch of metrics for export.
204    ///
205    /// Returns `None` if the metrics queue is empty.
206    pub async fn get_metrics_batch(&self) -> Option<BatchedSignal> {
207        let mut queue = self.metrics.lock().await;
208        let batch = queue.get_batch();
209
210        if batch.is_empty() {
211            return None;
212        }
213
214        let merged = merge_metrics_requests(batch);
215        Some(BatchedSignal::Metrics(merged))
216    }
217
218    /// Gets the next batch of logs for export.
219    ///
220    /// Returns `None` if the logs queue is empty.
221    pub async fn get_logs_batch(&self) -> Option<BatchedSignal> {
222        let mut queue = self.logs.lock().await;
223        let batch = queue.get_batch();
224
225        if batch.is_empty() {
226            return None;
227        }
228
229        let merged = merge_logs_requests(batch);
230        Some(BatchedSignal::Logs(merged))
231    }
232
233    /// Gets all available batches across all signal types.
234    pub async fn get_all_batches(&self) -> Vec<BatchedSignal> {
235        let mut batches = Vec::new();
236
237        while let Some(batch) = self.get_trace_batch().await {
238            batches.push(batch);
239        }
240
241        while let Some(batch) = self.get_metrics_batch().await {
242            batches.push(batch);
243        }
244
245        while let Some(batch) = self.get_logs_batch().await {
246            batches.push(batch);
247        }
248
249        batches
250    }
251
252    /// Drains all signals from all queues.
253    ///
254    /// Use this for shutdown to ensure all data is exported.
255    pub async fn drain_all(&self) -> Vec<BatchedSignal> {
256        let mut batches = Vec::new();
257
258        {
259            let mut queue = self.traces.lock().await;
260            let all = queue.drain_all();
261            if !all.is_empty() {
262                batches.push(BatchedSignal::Traces(merge_trace_requests(all)));
263            }
264        }
265
266        {
267            let mut queue = self.metrics.lock().await;
268            let all = queue.drain_all();
269            if !all.is_empty() {
270                batches.push(BatchedSignal::Metrics(merge_metrics_requests(all)));
271            }
272        }
273
274        {
275            let mut queue = self.logs.lock().await;
276            let all = queue.drain_all();
277            if !all.is_empty() {
278                batches.push(BatchedSignal::Logs(merge_logs_requests(all)));
279            }
280        }
281
282        batches
283    }
284
285    /// Returns the total count of pending items across all queues.
286    pub async fn pending_count(&self) -> usize {
287        let traces = self.traces.lock().await.len();
288        let metrics = self.metrics.lock().await.len();
289        let logs = self.logs.lock().await.len();
290        traces + metrics + logs
291    }
292
293    /// Returns whether all queues are empty.
294    pub async fn is_empty(&self) -> bool {
295        self.traces.lock().await.is_empty()
296            && self.metrics.lock().await.is_empty()
297            && self.logs.lock().await.is_empty()
298    }
299
300    /// Waits until there is data available or the notify is triggered.
301    pub async fn wait_for_data(&self) {
302        self.notify.notified().await;
303    }
304
305    /// Returns a clone of the notify handle for external coordination.
306    pub fn notify_handle(&self) -> Arc<Notify> {
307        self.notify.clone()
308    }
309
310    /// Returns the total count of dropped items across all queues.
311    ///
312    /// Items are dropped when the queue reaches its size limits.
313    pub async fn dropped_count(&self) -> u64 {
314        let traces = self.traces.lock().await.dropped_count();
315        let metrics = self.metrics.lock().await.dropped_count();
316        let logs = self.logs.lock().await.dropped_count();
317        traces + metrics + logs
318    }
319}
320
321fn merge_trace_requests(requests: Vec<ExportTraceServiceRequest>) -> ExportTraceServiceRequest {
322    ExportTraceServiceRequest {
323        resource_spans: requests
324            .into_iter()
325            .flat_map(|r| r.resource_spans)
326            .collect(),
327    }
328}
329
330fn merge_metrics_requests(
331    requests: Vec<ExportMetricsServiceRequest>,
332) -> ExportMetricsServiceRequest {
333    ExportMetricsServiceRequest {
334        resource_metrics: requests
335            .into_iter()
336            .flat_map(|r| r.resource_metrics)
337            .collect(),
338    }
339}
340
341fn merge_logs_requests(requests: Vec<ExportLogsServiceRequest>) -> ExportLogsServiceRequest {
342    ExportLogsServiceRequest {
343        resource_logs: requests.into_iter().flat_map(|r| r.resource_logs).collect(),
344    }
345}
346
347#[cfg(test)]
348mod tests {
349    use super::*;
350    use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans, Span};
351
352    fn make_trace_request(span_name: &str) -> ExportTraceServiceRequest {
353        ExportTraceServiceRequest {
354            resource_spans: vec![ResourceSpans {
355                scope_spans: vec![ScopeSpans {
356                    spans: vec![Span {
357                        name: span_name.to_string(),
358                        trace_id: vec![1; 16],
359                        span_id: vec![1; 8],
360                        ..Default::default()
361                    }],
362                    ..Default::default()
363                }],
364                ..Default::default()
365            }],
366        }
367    }
368
369    #[tokio::test]
370    async fn test_add_and_get_traces() {
371        let aggregator = SignalAggregator::with_defaults();
372
373        let signal = Signal::Traces(make_trace_request("test-span"));
374        aggregator.add(signal).await;
375
376        let batch = aggregator.get_trace_batch().await;
377        assert!(batch.is_some());
378
379        match batch.unwrap() {
380            BatchedSignal::Traces(req) => {
381                assert_eq!(req.resource_spans.len(), 1);
382                assert_eq!(
383                    req.resource_spans[0].scope_spans[0].spans[0].name,
384                    "test-span"
385                );
386            }
387            _ => panic!("Expected traces batch"),
388        }
389    }
390
391    #[tokio::test]
392    async fn test_merge_multiple_requests() {
393        let aggregator = SignalAggregator::with_defaults();
394
395        for i in 0..3 {
396            let signal = Signal::Traces(make_trace_request(&format!("span-{}", i)));
397            aggregator.add(signal).await;
398        }
399
400        let batch = aggregator.get_trace_batch().await;
401        assert!(batch.is_some());
402
403        match batch.unwrap() {
404            BatchedSignal::Traces(req) => {
405                assert_eq!(req.resource_spans.len(), 3);
406            }
407            _ => panic!("Expected traces batch"),
408        }
409    }
410
411    #[tokio::test]
412    async fn test_empty_queue_returns_none() {
413        let aggregator = SignalAggregator::with_defaults();
414
415        assert!(aggregator.get_trace_batch().await.is_none());
416        assert!(aggregator.get_metrics_batch().await.is_none());
417        assert!(aggregator.get_logs_batch().await.is_none());
418    }
419
420    #[tokio::test]
421    async fn test_pending_count() {
422        let aggregator = SignalAggregator::with_defaults();
423
424        assert_eq!(aggregator.pending_count().await, 0);
425
426        aggregator
427            .add(Signal::Traces(make_trace_request("span-1")))
428            .await;
429        aggregator
430            .add(Signal::Traces(make_trace_request("span-2")))
431            .await;
432
433        assert_eq!(aggregator.pending_count().await, 2);
434    }
435
436    #[tokio::test]
437    async fn test_drain_all() {
438        let aggregator = SignalAggregator::with_defaults();
439
440        aggregator
441            .add(Signal::Traces(make_trace_request("span-1")))
442            .await;
443        aggregator
444            .add(Signal::Traces(make_trace_request("span-2")))
445            .await;
446
447        let batches = aggregator.drain_all().await;
448        assert_eq!(batches.len(), 1);
449
450        assert!(aggregator.is_empty().await);
451    }
452
453    #[tokio::test]
454    async fn test_batch_size_limit() {
455        let config = FlushConfig {
456            max_batch_entries: 2,
457            ..Default::default()
458        };
459        let aggregator = SignalAggregator::new(config);
460
461        for i in 0..5 {
462            aggregator
463                .add(Signal::Traces(make_trace_request(&format!("span-{}", i))))
464                .await;
465        }
466
467        let batch1 = aggregator.get_trace_batch().await.unwrap();
468        match batch1 {
469            BatchedSignal::Traces(req) => assert_eq!(req.resource_spans.len(), 2),
470            _ => panic!("Expected traces"),
471        }
472
473        let batch2 = aggregator.get_trace_batch().await.unwrap();
474        match batch2 {
475            BatchedSignal::Traces(req) => assert_eq!(req.resource_spans.len(), 2),
476            _ => panic!("Expected traces"),
477        }
478
479        let batch3 = aggregator.get_trace_batch().await.unwrap();
480        match batch3 {
481            BatchedSignal::Traces(req) => assert_eq!(req.resource_spans.len(), 1),
482            _ => panic!("Expected traces"),
483        }
484
485        assert!(aggregator.get_trace_batch().await.is_none());
486    }
487
488    #[tokio::test]
489    async fn test_get_all_batches() {
490        let aggregator = SignalAggregator::with_defaults();
491
492        aggregator
493            .add(Signal::Traces(make_trace_request("span")))
494            .await;
495        aggregator
496            .add(Signal::Metrics(ExportMetricsServiceRequest::default()))
497            .await;
498        aggregator
499            .add(Signal::Logs(ExportLogsServiceRequest::default()))
500            .await;
501
502        let batches = aggregator.get_all_batches().await;
503        assert_eq!(batches.len(), 3);
504    }
505
506    #[test]
507    fn test_batched_signal_size() {
508        let req = make_trace_request("test");
509        let batch = BatchedSignal::Traces(req);
510        assert!(batch.size_bytes() > 0);
511    }
512
513    #[tokio::test]
514    async fn test_queue_drops_oldest_when_full() {
515        // Create a queue with a very low entry limit
516        let config = FlushConfig {
517            max_batch_entries: 100, // batch limit
518            ..Default::default()
519        };
520        let aggregator = SignalAggregator::new(config);
521
522        // The internal queue limit is 10,000 by default, so we need to
523        // directly test the SignalQueue behaviour
524        assert_eq!(aggregator.dropped_count().await, 0);
525    }
526
527    #[test]
528    fn test_signal_queue_bounds() {
529        use super::{DEFAULT_MAX_QUEUE_ENTRIES, SignalQueue};
530
531        let config = FlushConfig::default();
532        let mut queue: SignalQueue<ExportTraceServiceRequest> = SignalQueue::new(&config);
533
534        // Add items up to the limit
535        for i in 0..DEFAULT_MAX_QUEUE_ENTRIES {
536            queue.push(make_trace_request(&format!("span-{}", i)));
537        }
538        assert_eq!(queue.len(), DEFAULT_MAX_QUEUE_ENTRIES);
539        assert_eq!(queue.dropped_count(), 0);
540
541        // Add one more - should drop the oldest
542        queue.push(make_trace_request("overflow-span"));
543        assert_eq!(queue.len(), DEFAULT_MAX_QUEUE_ENTRIES);
544        assert_eq!(queue.dropped_count(), 1);
545
546        // Add a few more
547        queue.push(make_trace_request("overflow-span-2"));
548        queue.push(make_trace_request("overflow-span-3"));
549        assert_eq!(queue.dropped_count(), 3);
550    }
551}