1use crate::error::{ObservabilityError, ObservabilityResult};
4use crate::traits::{LogLevel, SpanStatus};
5use std::collections::{HashMap, VecDeque};
6use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
7use std::sync::{Arc, Mutex, RwLock};
8use web_time::{Duration, Instant};
9
10#[cfg(feature = "structured-logging")]
11use serde_json::Value as JsonValue;
12
13#[derive(Debug, Clone)]
15pub struct BatchingConfig {
16 pub max_batch_size: usize,
18 pub flush_interval: Duration,
20 pub max_memory_bytes: usize,
22 pub drop_on_overflow: bool,
24 pub min_batch_size: usize,
26}
27
28impl Default for BatchingConfig {
29 fn default() -> Self {
30 Self {
31 max_batch_size: 100,
32 flush_interval: Duration::from_secs(5),
33 max_memory_bytes: 1024 * 1024, drop_on_overflow: true,
35 min_batch_size: 10,
36 }
37 }
38}
39
40#[derive(Debug, Clone)]
42pub enum TelemetryData {
43 Span(SpanData),
44 Metric(MetricData),
45 #[cfg(feature = "structured-logging")]
46 Log(LogData),
47}
48
49impl TelemetryData {
50 pub fn estimated_size(&self) -> usize {
52 match self {
53 TelemetryData::Span(span) => span.estimated_size(),
54 TelemetryData::Metric(metric) => metric.estimated_size(),
55 #[cfg(feature = "structured-logging")]
56 TelemetryData::Log(log) => log.estimated_size(),
57 }
58 }
59}
60
61impl MemoryEstimator for SpanData {
62 fn estimated_size(&self) -> usize {
63 self.estimated_size()
64 }
65}
66
67impl MemoryEstimator for MetricData {
68 fn estimated_size(&self) -> usize {
69 self.estimated_size()
70 }
71}
72
73#[cfg(feature = "structured-logging")]
74impl MemoryEstimator for LogData {
75 fn estimated_size(&self) -> usize {
76 self.estimated_size()
77 }
78}
79
80#[derive(Debug, Clone)]
82pub struct SpanData {
83 pub span_id: String,
84 pub trace_id: String,
85 pub parent_span_id: Option<String>,
86 pub name: String,
87 pub start_time: Instant,
88 pub end_time: Option<Instant>,
89 pub status: SpanStatus,
90 pub attributes: HashMap<String, String>,
91}
92
93impl SpanData {
94 pub fn new(span_id: String, trace_id: String, name: String) -> Self {
95 Self {
96 span_id,
97 trace_id,
98 parent_span_id: None,
99 name,
100 start_time: Instant::now(),
101 end_time: None,
102 status: SpanStatus::Ok,
103 attributes: HashMap::new(),
104 }
105 }
106
107 pub fn with_parent(mut self, parent_span_id: String) -> Self {
108 self.parent_span_id = Some(parent_span_id);
109 self
110 }
111
112 pub fn add_attribute(&mut self, key: String, value: String) {
113 self.attributes.insert(key, value);
114 }
115
116 pub fn end(&mut self) {
117 self.end_time = Some(Instant::now());
118 }
119
120 pub fn duration(&self) -> Option<Duration> {
121 self.end_time.map(|end| end.duration_since(self.start_time))
122 }
123
124 fn estimated_size(&self) -> usize {
125 std::mem::size_of::<Self>()
126 + self.span_id.len()
127 + self.trace_id.len()
128 + self.parent_span_id.as_ref().map_or(0, |s| s.len())
129 + self.name.len()
130 + self
131 .attributes
132 .iter()
133 .map(|(k, v)| k.len() + v.len())
134 .sum::<usize>()
135 }
136}
137
138#[derive(Debug, Clone)]
140pub struct MetricData {
141 pub name: String,
142 pub value: f64,
143 pub labels: HashMap<String, String>,
144 pub timestamp: Instant,
145 pub metric_type: MetricType,
146}
147
148#[derive(Debug, Clone)]
149pub enum MetricType {
150 Counter,
151 Histogram,
152 Gauge,
153}
154
155impl MetricData {
156 pub fn new(
157 name: String,
158 value: f64,
159 labels: HashMap<String, String>,
160 metric_type: MetricType,
161 ) -> Self {
162 Self {
163 name,
164 value,
165 labels,
166 timestamp: Instant::now(),
167 metric_type,
168 }
169 }
170
171 fn estimated_size(&self) -> usize {
172 std::mem::size_of::<Self>()
173 + self.name.len()
174 + self
175 .labels
176 .iter()
177 .map(|(k, v)| k.len() + v.len())
178 .sum::<usize>()
179 }
180}
181
182#[cfg(feature = "structured-logging")]
184#[derive(Debug, Clone)]
185pub struct LogData {
186 pub level: LogLevel,
187 pub message: String,
188 pub fields: JsonValue,
189 pub timestamp: Instant,
190 pub trace_id: Option<String>,
191 pub span_id: Option<String>,
192}
193
194#[cfg(feature = "structured-logging")]
195impl LogData {
196 pub fn new(level: LogLevel, message: String, fields: JsonValue) -> Self {
197 Self {
198 level,
199 message,
200 fields,
201 timestamp: Instant::now(),
202 trace_id: None,
203 span_id: None,
204 }
205 }
206
207 pub fn with_trace_context(mut self, trace_id: String, span_id: String) -> Self {
208 self.trace_id = Some(trace_id);
209 self.span_id = Some(span_id);
210 self
211 }
212
213 fn estimated_size(&self) -> usize {
214 std::mem::size_of::<Self>()
215 + self.message.len()
216 + self.fields.to_string().len() + self.trace_id.as_ref().map_or(0, |s| s.len())
218 + self.span_id.as_ref().map_or(0, |s| s.len())
219 }
220}
221
222pub struct MemoryEfficientBuffer<T> {
224 buffer: VecDeque<T>,
225 max_size: usize,
226 current_memory: AtomicUsize,
227 max_memory: usize,
228 dropped_count: AtomicU64,
229 drop_on_overflow: bool,
230}
231
232impl<T> MemoryEfficientBuffer<T>
233where
234 T: Clone,
235{
236 pub fn new(max_size: usize, max_memory: usize, drop_on_overflow: bool) -> Self {
237 Self {
238 buffer: VecDeque::with_capacity(max_size.min(1000)), max_size,
240 current_memory: AtomicUsize::new(0),
241 max_memory,
242 dropped_count: AtomicU64::new(0),
243 drop_on_overflow,
244 }
245 }
246
247 pub fn push(&mut self, item: T) -> bool
248 where
249 T: MemoryEstimator,
250 {
251 let item_size = item.estimated_size();
252
253 if self.current_memory.load(Ordering::Relaxed) + item_size > self.max_memory {
255 if self.drop_on_overflow {
256 self.dropped_count.fetch_add(1, Ordering::Relaxed);
257 return false;
258 } else {
259 while self.current_memory.load(Ordering::Relaxed) + item_size > self.max_memory
261 && !self.buffer.is_empty()
262 {
263 if let Some(old_item) = self.buffer.pop_front() {
264 let old_size = old_item.estimated_size();
265 self.current_memory.fetch_sub(old_size, Ordering::Relaxed);
266 }
267 }
268 }
269 }
270
271 if self.buffer.len() >= self.max_size {
273 if self.drop_on_overflow {
274 self.dropped_count.fetch_add(1, Ordering::Relaxed);
275 return false;
276 } else if let Some(old_item) = self.buffer.pop_front() {
277 let old_size = old_item.estimated_size();
278 self.current_memory.fetch_sub(old_size, Ordering::Relaxed);
279 }
280 }
281
282 self.buffer.push_back(item);
283 self.current_memory.fetch_add(item_size, Ordering::Relaxed);
284 true
285 }
286
287 pub fn drain(&mut self) -> Vec<T> {
288 let items: Vec<T> = self.buffer.drain(..).collect();
289 self.current_memory.store(0, Ordering::Relaxed);
290 items
291 }
292
293 pub fn len(&self) -> usize {
294 self.buffer.len()
295 }
296
297 pub fn is_empty(&self) -> bool {
298 self.buffer.is_empty()
299 }
300
301 pub fn dropped_count(&self) -> u64 {
302 self.dropped_count.load(Ordering::Relaxed)
303 }
304
305 pub fn memory_usage(&self) -> usize {
306 self.current_memory.load(Ordering::Relaxed)
307 }
308}
309
310pub trait MemoryEstimator {
312 fn estimated_size(&self) -> usize;
313}
314
315impl MemoryEstimator for TelemetryData {
316 fn estimated_size(&self) -> usize {
317 self.estimated_size()
318 }
319}
320
321pub struct BatchingManager {
323 config: BatchingConfig,
324 span_buffer: Arc<Mutex<MemoryEfficientBuffer<SpanData>>>,
325 metric_buffer: Arc<Mutex<MemoryEfficientBuffer<MetricData>>>,
326 #[cfg(feature = "structured-logging")]
327 log_buffer: Arc<Mutex<MemoryEfficientBuffer<LogData>>>,
328 last_flush: Arc<RwLock<Instant>>,
329 flush_callback: Arc<
330 Mutex<Option<Box<dyn Fn(Vec<TelemetryData>) -> ObservabilityResult<()> + Send + Sync>>>,
331 >,
332}
333
334impl BatchingManager {
335 pub fn new(config: BatchingConfig) -> Self {
336 let buffer_size = config.max_batch_size;
337 let memory_per_buffer = config.max_memory_bytes / 3; Self {
340 span_buffer: Arc::new(Mutex::new(MemoryEfficientBuffer::new(
341 buffer_size,
342 memory_per_buffer,
343 config.drop_on_overflow,
344 ))),
345 metric_buffer: Arc::new(Mutex::new(MemoryEfficientBuffer::new(
346 buffer_size,
347 memory_per_buffer,
348 config.drop_on_overflow,
349 ))),
350 #[cfg(feature = "structured-logging")]
351 log_buffer: Arc::new(Mutex::new(MemoryEfficientBuffer::new(
352 buffer_size,
353 memory_per_buffer,
354 config.drop_on_overflow,
355 ))),
356 last_flush: Arc::new(RwLock::new(Instant::now())),
357 flush_callback: Arc::new(Mutex::new(None)),
358 config,
359 }
360 }
361
362 pub fn set_flush_callback<F>(&mut self, callback: F)
364 where
365 F: Fn(Vec<TelemetryData>) -> ObservabilityResult<()> + Send + Sync + 'static,
366 {
367 let mut cb = self.flush_callback.lock().unwrap();
368 *cb = Some(Box::new(callback));
369 }
370
371 pub fn add_span(&self, span: SpanData) -> ObservabilityResult<()> {
373 let mut buffer = self
374 .span_buffer
375 .lock()
376 .map_err(|_| ObservabilityError::batching("Failed to acquire span buffer lock"))?;
377
378 if !buffer.push(span) {
379 return Err(ObservabilityError::buffer("Span buffer overflow"));
380 }
381
382 self.check_and_flush()?;
384 Ok(())
385 }
386
387 pub fn add_metric(&self, metric: MetricData) -> ObservabilityResult<()> {
389 let mut buffer = self
390 .metric_buffer
391 .lock()
392 .map_err(|_| ObservabilityError::batching("Failed to acquire metric buffer lock"))?;
393
394 if !buffer.push(metric) {
395 return Err(ObservabilityError::buffer("Metric buffer overflow"));
396 }
397
398 self.check_and_flush()?;
400 Ok(())
401 }
402
403 #[cfg(feature = "structured-logging")]
405 pub fn add_log(&self, log: LogData) -> ObservabilityResult<()> {
406 let mut buffer = self
407 .log_buffer
408 .lock()
409 .map_err(|_| ObservabilityError::batching("Failed to acquire log buffer lock"))?;
410
411 if !buffer.push(log) {
412 return Err(ObservabilityError::buffer("Log buffer overflow"));
413 }
414
415 self.check_and_flush()?;
417 Ok(())
418 }
419
420 fn check_and_flush(&self) -> ObservabilityResult<()> {
422 let should_flush = {
423 let last_flush = self.last_flush.read().unwrap();
424 let elapsed = last_flush.elapsed();
425
426 if elapsed >= self.config.flush_interval {
428 true
429 } else {
430 let span_len = self.span_buffer.lock().unwrap().len();
432 let metric_len = self.metric_buffer.lock().unwrap().len();
433 #[cfg(feature = "structured-logging")]
434 let log_len = self.log_buffer.lock().unwrap().len();
435 #[cfg(not(feature = "structured-logging"))]
436 let log_len = 0;
437
438 let total_items = span_len + metric_len + log_len;
439 total_items >= self.config.min_batch_size
440 }
441 };
442
443 if should_flush {
444 self.flush_all()?;
445 }
446
447 Ok(())
448 }
449
450 pub fn flush_all(&self) -> ObservabilityResult<()> {
452 let mut all_data = Vec::new();
453
454 {
456 let mut span_buffer = self.span_buffer.lock().unwrap();
457 let spans = span_buffer.drain();
458 all_data.extend(spans.into_iter().map(TelemetryData::Span));
459 }
460
461 {
462 let mut metric_buffer = self.metric_buffer.lock().unwrap();
463 let metrics = metric_buffer.drain();
464 all_data.extend(metrics.into_iter().map(TelemetryData::Metric));
465 }
466
467 #[cfg(feature = "structured-logging")]
468 {
469 let mut log_buffer = self.log_buffer.lock().unwrap();
470 let logs = log_buffer.drain();
471 all_data.extend(logs.into_iter().map(TelemetryData::Log));
472 }
473
474 {
476 let mut last_flush = self.last_flush.write().unwrap();
477 *last_flush = Instant::now();
478 }
479
480 if !all_data.is_empty() {
482 if let Some(callback) = self.flush_callback.lock().unwrap().as_ref() {
483 callback(all_data)?;
484 }
485 }
486
487 Ok(())
488 }
489
490 pub fn get_stats(&self) -> BatchingStats {
492 let span_buffer = self.span_buffer.lock().unwrap();
493 let metric_buffer = self.metric_buffer.lock().unwrap();
494
495 #[cfg(feature = "structured-logging")]
496 let log_buffer = self.log_buffer.lock().unwrap();
497 #[cfg(not(feature = "structured-logging"))]
498 let log_buffer_len = 0;
499 #[cfg(not(feature = "structured-logging"))]
500 let log_dropped = 0;
501 #[cfg(not(feature = "structured-logging"))]
502 let log_memory = 0;
503
504 BatchingStats {
505 span_count: span_buffer.len(),
506 metric_count: metric_buffer.len(),
507 #[cfg(feature = "structured-logging")]
508 log_count: log_buffer.len(),
509 #[cfg(not(feature = "structured-logging"))]
510 log_count: log_buffer_len,
511 span_dropped: span_buffer.dropped_count(),
512 metric_dropped: metric_buffer.dropped_count(),
513 #[cfg(feature = "structured-logging")]
514 log_dropped: log_buffer.dropped_count(),
515 #[cfg(not(feature = "structured-logging"))]
516 log_dropped,
517 memory_usage: {
518 let mut total = span_buffer.memory_usage() + metric_buffer.memory_usage();
519 #[cfg(feature = "structured-logging")]
520 {
521 total += log_buffer.memory_usage();
522 }
523 #[cfg(not(feature = "structured-logging"))]
524 {
525 total += log_memory;
526 }
527 total
528 },
529 last_flush: *self.last_flush.read().unwrap(),
530 }
531 }
532}
533
534#[derive(Debug, Clone)]
536pub struct BatchingStats {
537 pub span_count: usize,
538 pub metric_count: usize,
539 pub log_count: usize,
540 pub span_dropped: u64,
541 pub metric_dropped: u64,
542 pub log_dropped: u64,
543 pub memory_usage: usize,
544 pub last_flush: Instant,
545}
546
547impl BatchingStats {
548 pub fn total_items(&self) -> usize {
549 self.span_count + self.metric_count + self.log_count
550 }
551
552 pub fn total_dropped(&self) -> u64 {
553 self.span_dropped + self.metric_dropped + self.log_dropped
554 }
555}
556
557#[cfg(test)]
558mod tests {
559 use super::*;
560
561 #[test]
562 fn test_memory_efficient_buffer() {
563 let mut buffer = MemoryEfficientBuffer::new(3, 1000, true);
564
565 let data1 = MetricData::new(
566 "test1".to_string(),
567 1.0,
568 HashMap::new(),
569 MetricType::Counter,
570 );
571 let data2 = MetricData::new(
572 "test2".to_string(),
573 2.0,
574 HashMap::new(),
575 MetricType::Counter,
576 );
577 let data3 = MetricData::new(
578 "test3".to_string(),
579 3.0,
580 HashMap::new(),
581 MetricType::Counter,
582 );
583 let data4 = MetricData::new(
584 "test4".to_string(),
585 4.0,
586 HashMap::new(),
587 MetricType::Counter,
588 );
589
590 assert!(buffer.push(data1));
591 assert!(buffer.push(data2));
592 assert!(buffer.push(data3));
593
594 assert!(!buffer.push(data4));
596 assert_eq!(buffer.dropped_count(), 1);
597 assert_eq!(buffer.len(), 3);
598 }
599
600 #[test]
601 fn test_batching_manager() {
602 let config = BatchingConfig {
603 max_batch_size: 5,
604 min_batch_size: 10, flush_interval: Duration::from_hours(1), ..Default::default()
607 };
608
609 let manager = BatchingManager::new(config);
610
611 let stats = manager.get_stats();
613 assert_eq!(stats.metric_count, 0);
614 assert_eq!(stats.total_items(), 0);
615
616 assert_eq!(stats.total_dropped(), 0);
618 }
619
620 #[test]
621 fn test_span_data_creation() {
622 let span = SpanData::new(
623 "span1".to_string(),
624 "trace1".to_string(),
625 "test_span".to_string(),
626 )
627 .with_parent("parent1".to_string());
628
629 assert_eq!(span.span_id, "span1");
630 assert_eq!(span.trace_id, "trace1");
631 assert_eq!(span.parent_span_id, Some("parent1".to_string()));
632 assert_eq!(span.name, "test_span");
633 }
634
635 #[test]
636 fn test_metric_data_creation() {
637 let mut labels = HashMap::new();
638 labels.insert("env".to_string(), "test".to_string());
639
640 let metric = MetricData::new(
641 "test_metric".to_string(),
642 42.0,
643 labels.clone(),
644 MetricType::Gauge,
645 );
646
647 assert_eq!(metric.name, "test_metric");
648 assert_eq!(metric.value, 42.0);
649 assert_eq!(metric.labels, labels);
650 assert!(matches!(metric.metric_type, MetricType::Gauge));
651 }
652
653 #[test]
654 fn test_batching_config() {
655 let config = BatchingConfig::default();
656 assert_eq!(config.max_batch_size, 100);
657 assert_eq!(config.flush_interval, Duration::from_secs(5));
658 assert_eq!(config.max_memory_bytes, 1024 * 1024);
659 assert!(config.drop_on_overflow);
660 assert_eq!(config.min_batch_size, 10);
661 }
662
663 #[test]
664 fn test_telemetry_data_size_estimation() {
665 let span = SpanData::new("test".to_string(), "trace".to_string(), "span".to_string());
666 let span_data = TelemetryData::Span(span);
667 assert!(span_data.estimated_size() > 0);
668
669 let metric = MetricData::new("test".to_string(), 1.0, HashMap::new(), MetricType::Counter);
670 let metric_data = TelemetryData::Metric(metric);
671 assert!(metric_data.estimated_size() > 0);
672 }
673}