opentelemetry_lambda_extension/
aggregator.rs1use crate::config::FlushConfig;
7use crate::receiver::Signal;
8use opentelemetry_proto::tonic::collector::{
9 logs::v1::ExportLogsServiceRequest, metrics::v1::ExportMetricsServiceRequest,
10 trace::v1::ExportTraceServiceRequest,
11};
12use prost::Message;
13use std::collections::VecDeque;
14use std::sync::Arc;
15use tokio::sync::{Mutex, Notify, mpsc};
16
17#[non_exhaustive]
19#[derive(Debug, Clone)]
20pub enum BatchedSignal {
21 Traces(ExportTraceServiceRequest),
23 Metrics(ExportMetricsServiceRequest),
25 Logs(ExportLogsServiceRequest),
27}
28
29impl BatchedSignal {
30 pub fn size_bytes(&self) -> usize {
32 match self {
33 BatchedSignal::Traces(req) => req.encoded_len(),
34 BatchedSignal::Metrics(req) => req.encoded_len(),
35 BatchedSignal::Logs(req) => req.encoded_len(),
36 }
37 }
38}
39
40const DEFAULT_MAX_QUEUE_ENTRIES: usize = 10_000;
42const DEFAULT_MAX_QUEUE_BYTES: usize = 64 * 1024 * 1024; struct SignalQueue<T> {
47 items: VecDeque<T>,
48 max_batch_bytes: usize,
49 max_batch_entries: usize,
50 max_queue_entries: usize,
51 max_queue_bytes: usize,
52 current_bytes: usize,
53 dropped_count: u64,
54}
55
56impl<T: Message + Default + Clone> SignalQueue<T> {
57 fn new(config: &FlushConfig) -> Self {
58 Self {
59 items: VecDeque::new(),
60 max_batch_bytes: config.max_batch_bytes,
61 max_batch_entries: config.max_batch_entries,
62 max_queue_entries: DEFAULT_MAX_QUEUE_ENTRIES,
63 max_queue_bytes: DEFAULT_MAX_QUEUE_BYTES,
64 current_bytes: 0,
65 dropped_count: 0,
66 }
67 }
68
69 fn push(&mut self, item: T) {
70 let item_size = item.encoded_len();
71
72 while !self.items.is_empty()
74 && (self.items.len() >= self.max_queue_entries
75 || self.current_bytes + item_size > self.max_queue_bytes)
76 {
77 if let Some(dropped) = self.items.pop_front() {
78 self.current_bytes = self.current_bytes.saturating_sub(dropped.encoded_len());
79 self.dropped_count += 1;
80 }
81 }
82
83 self.current_bytes += item_size;
84 self.items.push_back(item);
85 }
86
87 fn dropped_count(&self) -> u64 {
88 self.dropped_count
89 }
90
91 fn len(&self) -> usize {
92 self.items.len()
93 }
94
95 fn is_empty(&self) -> bool {
96 self.items.is_empty()
97 }
98
99 fn get_batch(&mut self) -> Vec<T> {
100 let mut batch = Vec::new();
101 let mut batch_size = 0;
102
103 while let Some(item) = self.items.pop_front() {
104 let item_size = item.encoded_len();
105
106 if !batch.is_empty()
107 && (batch_size + item_size > self.max_batch_bytes
108 || batch.len() >= self.max_batch_entries)
109 {
110 self.items.push_front(item);
112 break;
113 }
114
115 self.current_bytes = self.current_bytes.saturating_sub(item_size);
116 batch.push(item);
117 batch_size += item_size;
118 }
119
120 batch
121 }
122
123 fn drain_all(&mut self) -> Vec<T> {
124 self.current_bytes = 0;
125 self.items.drain(..).collect()
126 }
127}
128
129pub struct SignalAggregator {
134 traces: Arc<Mutex<SignalQueue<ExportTraceServiceRequest>>>,
135 metrics: Arc<Mutex<SignalQueue<ExportMetricsServiceRequest>>>,
136 logs: Arc<Mutex<SignalQueue<ExportLogsServiceRequest>>>,
137 notify: Arc<Notify>,
138 #[allow(dead_code)]
139 config: FlushConfig,
140}
141
142impl SignalAggregator {
143 pub fn new(config: FlushConfig) -> Self {
145 Self {
146 traces: Arc::new(Mutex::new(SignalQueue::new(&config))),
147 metrics: Arc::new(Mutex::new(SignalQueue::new(&config))),
148 logs: Arc::new(Mutex::new(SignalQueue::new(&config))),
149 notify: Arc::new(Notify::new()),
150 config,
151 }
152 }
153
154 pub fn with_defaults() -> Self {
156 Self::new(FlushConfig::default())
157 }
158
159 pub async fn add(&self, signal: Signal) {
161 match signal {
162 Signal::Traces(req) => {
163 let mut queue = self.traces.lock().await;
164 queue.push(req);
165 }
166 Signal::Metrics(req) => {
167 let mut queue = self.metrics.lock().await;
168 queue.push(req);
169 }
170 Signal::Logs(req) => {
171 let mut queue = self.logs.lock().await;
172 queue.push(req);
173 }
174 }
175 self.notify.notify_one();
176 }
177
178 pub async fn run(&self, mut signal_rx: mpsc::Receiver<Signal>) {
182 while let Some(signal) = signal_rx.recv().await {
183 self.add(signal).await;
184 }
185 tracing::debug!("Signal aggregator channel closed");
186 }
187
188 pub async fn get_trace_batch(&self) -> Option<BatchedSignal> {
192 let mut queue = self.traces.lock().await;
193 let batch = queue.get_batch();
194
195 if batch.is_empty() {
196 return None;
197 }
198
199 let merged = merge_trace_requests(batch);
200 Some(BatchedSignal::Traces(merged))
201 }
202
203 pub async fn get_metrics_batch(&self) -> Option<BatchedSignal> {
207 let mut queue = self.metrics.lock().await;
208 let batch = queue.get_batch();
209
210 if batch.is_empty() {
211 return None;
212 }
213
214 let merged = merge_metrics_requests(batch);
215 Some(BatchedSignal::Metrics(merged))
216 }
217
218 pub async fn get_logs_batch(&self) -> Option<BatchedSignal> {
222 let mut queue = self.logs.lock().await;
223 let batch = queue.get_batch();
224
225 if batch.is_empty() {
226 return None;
227 }
228
229 let merged = merge_logs_requests(batch);
230 Some(BatchedSignal::Logs(merged))
231 }
232
233 pub async fn get_all_batches(&self) -> Vec<BatchedSignal> {
235 let mut batches = Vec::new();
236
237 while let Some(batch) = self.get_trace_batch().await {
238 batches.push(batch);
239 }
240
241 while let Some(batch) = self.get_metrics_batch().await {
242 batches.push(batch);
243 }
244
245 while let Some(batch) = self.get_logs_batch().await {
246 batches.push(batch);
247 }
248
249 batches
250 }
251
252 pub async fn drain_all(&self) -> Vec<BatchedSignal> {
256 let mut batches = Vec::new();
257
258 {
259 let mut queue = self.traces.lock().await;
260 let all = queue.drain_all();
261 if !all.is_empty() {
262 batches.push(BatchedSignal::Traces(merge_trace_requests(all)));
263 }
264 }
265
266 {
267 let mut queue = self.metrics.lock().await;
268 let all = queue.drain_all();
269 if !all.is_empty() {
270 batches.push(BatchedSignal::Metrics(merge_metrics_requests(all)));
271 }
272 }
273
274 {
275 let mut queue = self.logs.lock().await;
276 let all = queue.drain_all();
277 if !all.is_empty() {
278 batches.push(BatchedSignal::Logs(merge_logs_requests(all)));
279 }
280 }
281
282 batches
283 }
284
285 pub async fn pending_count(&self) -> usize {
287 let traces = self.traces.lock().await.len();
288 let metrics = self.metrics.lock().await.len();
289 let logs = self.logs.lock().await.len();
290 traces + metrics + logs
291 }
292
293 pub async fn is_empty(&self) -> bool {
295 self.traces.lock().await.is_empty()
296 && self.metrics.lock().await.is_empty()
297 && self.logs.lock().await.is_empty()
298 }
299
300 pub async fn wait_for_data(&self) {
302 self.notify.notified().await;
303 }
304
305 pub fn notify_handle(&self) -> Arc<Notify> {
307 self.notify.clone()
308 }
309
310 pub async fn dropped_count(&self) -> u64 {
314 let traces = self.traces.lock().await.dropped_count();
315 let metrics = self.metrics.lock().await.dropped_count();
316 let logs = self.logs.lock().await.dropped_count();
317 traces + metrics + logs
318 }
319}
320
321fn merge_trace_requests(requests: Vec<ExportTraceServiceRequest>) -> ExportTraceServiceRequest {
322 ExportTraceServiceRequest {
323 resource_spans: requests
324 .into_iter()
325 .flat_map(|r| r.resource_spans)
326 .collect(),
327 }
328}
329
330fn merge_metrics_requests(
331 requests: Vec<ExportMetricsServiceRequest>,
332) -> ExportMetricsServiceRequest {
333 ExportMetricsServiceRequest {
334 resource_metrics: requests
335 .into_iter()
336 .flat_map(|r| r.resource_metrics)
337 .collect(),
338 }
339}
340
341fn merge_logs_requests(requests: Vec<ExportLogsServiceRequest>) -> ExportLogsServiceRequest {
342 ExportLogsServiceRequest {
343 resource_logs: requests.into_iter().flat_map(|r| r.resource_logs).collect(),
344 }
345}
346
347#[cfg(test)]
348mod tests {
349 use super::*;
350 use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans, Span};
351
352 fn make_trace_request(span_name: &str) -> ExportTraceServiceRequest {
353 ExportTraceServiceRequest {
354 resource_spans: vec![ResourceSpans {
355 scope_spans: vec![ScopeSpans {
356 spans: vec![Span {
357 name: span_name.to_string(),
358 trace_id: vec![1; 16],
359 span_id: vec![1; 8],
360 ..Default::default()
361 }],
362 ..Default::default()
363 }],
364 ..Default::default()
365 }],
366 }
367 }
368
369 #[tokio::test]
370 async fn test_add_and_get_traces() {
371 let aggregator = SignalAggregator::with_defaults();
372
373 let signal = Signal::Traces(make_trace_request("test-span"));
374 aggregator.add(signal).await;
375
376 let batch = aggregator.get_trace_batch().await;
377 assert!(batch.is_some());
378
379 match batch.unwrap() {
380 BatchedSignal::Traces(req) => {
381 assert_eq!(req.resource_spans.len(), 1);
382 assert_eq!(
383 req.resource_spans[0].scope_spans[0].spans[0].name,
384 "test-span"
385 );
386 }
387 _ => panic!("Expected traces batch"),
388 }
389 }
390
391 #[tokio::test]
392 async fn test_merge_multiple_requests() {
393 let aggregator = SignalAggregator::with_defaults();
394
395 for i in 0..3 {
396 let signal = Signal::Traces(make_trace_request(&format!("span-{}", i)));
397 aggregator.add(signal).await;
398 }
399
400 let batch = aggregator.get_trace_batch().await;
401 assert!(batch.is_some());
402
403 match batch.unwrap() {
404 BatchedSignal::Traces(req) => {
405 assert_eq!(req.resource_spans.len(), 3);
406 }
407 _ => panic!("Expected traces batch"),
408 }
409 }
410
411 #[tokio::test]
412 async fn test_empty_queue_returns_none() {
413 let aggregator = SignalAggregator::with_defaults();
414
415 assert!(aggregator.get_trace_batch().await.is_none());
416 assert!(aggregator.get_metrics_batch().await.is_none());
417 assert!(aggregator.get_logs_batch().await.is_none());
418 }
419
420 #[tokio::test]
421 async fn test_pending_count() {
422 let aggregator = SignalAggregator::with_defaults();
423
424 assert_eq!(aggregator.pending_count().await, 0);
425
426 aggregator
427 .add(Signal::Traces(make_trace_request("span-1")))
428 .await;
429 aggregator
430 .add(Signal::Traces(make_trace_request("span-2")))
431 .await;
432
433 assert_eq!(aggregator.pending_count().await, 2);
434 }
435
436 #[tokio::test]
437 async fn test_drain_all() {
438 let aggregator = SignalAggregator::with_defaults();
439
440 aggregator
441 .add(Signal::Traces(make_trace_request("span-1")))
442 .await;
443 aggregator
444 .add(Signal::Traces(make_trace_request("span-2")))
445 .await;
446
447 let batches = aggregator.drain_all().await;
448 assert_eq!(batches.len(), 1);
449
450 assert!(aggregator.is_empty().await);
451 }
452
453 #[tokio::test]
454 async fn test_batch_size_limit() {
455 let config = FlushConfig {
456 max_batch_entries: 2,
457 ..Default::default()
458 };
459 let aggregator = SignalAggregator::new(config);
460
461 for i in 0..5 {
462 aggregator
463 .add(Signal::Traces(make_trace_request(&format!("span-{}", i))))
464 .await;
465 }
466
467 let batch1 = aggregator.get_trace_batch().await.unwrap();
468 match batch1 {
469 BatchedSignal::Traces(req) => assert_eq!(req.resource_spans.len(), 2),
470 _ => panic!("Expected traces"),
471 }
472
473 let batch2 = aggregator.get_trace_batch().await.unwrap();
474 match batch2 {
475 BatchedSignal::Traces(req) => assert_eq!(req.resource_spans.len(), 2),
476 _ => panic!("Expected traces"),
477 }
478
479 let batch3 = aggregator.get_trace_batch().await.unwrap();
480 match batch3 {
481 BatchedSignal::Traces(req) => assert_eq!(req.resource_spans.len(), 1),
482 _ => panic!("Expected traces"),
483 }
484
485 assert!(aggregator.get_trace_batch().await.is_none());
486 }
487
488 #[tokio::test]
489 async fn test_get_all_batches() {
490 let aggregator = SignalAggregator::with_defaults();
491
492 aggregator
493 .add(Signal::Traces(make_trace_request("span")))
494 .await;
495 aggregator
496 .add(Signal::Metrics(ExportMetricsServiceRequest::default()))
497 .await;
498 aggregator
499 .add(Signal::Logs(ExportLogsServiceRequest::default()))
500 .await;
501
502 let batches = aggregator.get_all_batches().await;
503 assert_eq!(batches.len(), 3);
504 }
505
506 #[test]
507 fn test_batched_signal_size() {
508 let req = make_trace_request("test");
509 let batch = BatchedSignal::Traces(req);
510 assert!(batch.size_bytes() > 0);
511 }
512
513 #[tokio::test]
514 async fn test_queue_drops_oldest_when_full() {
515 let config = FlushConfig {
517 max_batch_entries: 100, ..Default::default()
519 };
520 let aggregator = SignalAggregator::new(config);
521
522 assert_eq!(aggregator.dropped_count().await, 0);
525 }
526
527 #[test]
528 fn test_signal_queue_bounds() {
529 use super::{DEFAULT_MAX_QUEUE_ENTRIES, SignalQueue};
530
531 let config = FlushConfig::default();
532 let mut queue: SignalQueue<ExportTraceServiceRequest> = SignalQueue::new(&config);
533
534 for i in 0..DEFAULT_MAX_QUEUE_ENTRIES {
536 queue.push(make_trace_request(&format!("span-{}", i)));
537 }
538 assert_eq!(queue.len(), DEFAULT_MAX_QUEUE_ENTRIES);
539 assert_eq!(queue.dropped_count(), 0);
540
541 queue.push(make_trace_request("overflow-span"));
543 assert_eq!(queue.len(), DEFAULT_MAX_QUEUE_ENTRIES);
544 assert_eq!(queue.dropped_count(), 1);
545
546 queue.push(make_trace_request("overflow-span-2"));
548 queue.push(make_trace_request("overflow-span-3"));
549 assert_eq!(queue.dropped_count(), 3);
550 }
551}