Skip to main content

observability_core/
batching.rs

1//! Smart batching system for telemetry data to optimize performance
2
3use crate::error::{ObservabilityError, ObservabilityResult};
4use crate::traits::{LogLevel, SpanStatus};
5use std::collections::{HashMap, VecDeque};
6use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
7use std::sync::{Arc, Mutex, RwLock};
8use web_time::{Duration, Instant};
9
10#[cfg(feature = "structured-logging")]
11use serde_json::Value as JsonValue;
12
13/// Configuration for batching behavior
14#[derive(Debug, Clone)]
15pub struct BatchingConfig {
16    /// Maximum number of items in a batch
17    pub max_batch_size: usize,
18    /// Maximum time to wait before flushing a batch
19    pub flush_interval: Duration,
20    /// Maximum memory usage in bytes (approximate)
21    pub max_memory_bytes: usize,
22    /// Whether to drop items when buffer is full
23    pub drop_on_overflow: bool,
24    /// Minimum batch size to trigger early flush
25    pub min_batch_size: usize,
26}
27
28impl Default for BatchingConfig {
29    fn default() -> Self {
30        Self {
31            max_batch_size: 100,
32            flush_interval: Duration::from_secs(5),
33            max_memory_bytes: 1024 * 1024, // 1MB
34            drop_on_overflow: true,
35            min_batch_size: 10,
36        }
37    }
38}
39
40/// Telemetry data types for batching
41#[derive(Debug, Clone)]
42pub enum TelemetryData {
43    Span(SpanData),
44    Metric(MetricData),
45    #[cfg(feature = "structured-logging")]
46    Log(LogData),
47}
48
49impl TelemetryData {
50    /// Estimate memory usage of this telemetry data
51    pub fn estimated_size(&self) -> usize {
52        match self {
53            TelemetryData::Span(span) => span.estimated_size(),
54            TelemetryData::Metric(metric) => metric.estimated_size(),
55            #[cfg(feature = "structured-logging")]
56            TelemetryData::Log(log) => log.estimated_size(),
57        }
58    }
59}
60
61impl MemoryEstimator for SpanData {
62    fn estimated_size(&self) -> usize {
63        self.estimated_size()
64    }
65}
66
67impl MemoryEstimator for MetricData {
68    fn estimated_size(&self) -> usize {
69        self.estimated_size()
70    }
71}
72
73#[cfg(feature = "structured-logging")]
74impl MemoryEstimator for LogData {
75    fn estimated_size(&self) -> usize {
76        self.estimated_size()
77    }
78}
79
80/// Span data for batching
81#[derive(Debug, Clone)]
82pub struct SpanData {
83    pub span_id: String,
84    pub trace_id: String,
85    pub parent_span_id: Option<String>,
86    pub name: String,
87    pub start_time: Instant,
88    pub end_time: Option<Instant>,
89    pub status: SpanStatus,
90    pub attributes: HashMap<String, String>,
91}
92
93impl SpanData {
94    pub fn new(span_id: String, trace_id: String, name: String) -> Self {
95        Self {
96            span_id,
97            trace_id,
98            parent_span_id: None,
99            name,
100            start_time: Instant::now(),
101            end_time: None,
102            status: SpanStatus::Ok,
103            attributes: HashMap::new(),
104        }
105    }
106
107    pub fn with_parent(mut self, parent_span_id: String) -> Self {
108        self.parent_span_id = Some(parent_span_id);
109        self
110    }
111
112    pub fn add_attribute(&mut self, key: String, value: String) {
113        self.attributes.insert(key, value);
114    }
115
116    pub fn end(&mut self) {
117        self.end_time = Some(Instant::now());
118    }
119
120    pub fn duration(&self) -> Option<Duration> {
121        self.end_time.map(|end| end.duration_since(self.start_time))
122    }
123
124    fn estimated_size(&self) -> usize {
125        std::mem::size_of::<Self>()
126            + self.span_id.len()
127            + self.trace_id.len()
128            + self.parent_span_id.as_ref().map_or(0, |s| s.len())
129            + self.name.len()
130            + self
131                .attributes
132                .iter()
133                .map(|(k, v)| k.len() + v.len())
134                .sum::<usize>()
135    }
136}
137
138/// Metric data for batching
139#[derive(Debug, Clone)]
140pub struct MetricData {
141    pub name: String,
142    pub value: f64,
143    pub labels: HashMap<String, String>,
144    pub timestamp: Instant,
145    pub metric_type: MetricType,
146}
147
148#[derive(Debug, Clone)]
149pub enum MetricType {
150    Counter,
151    Histogram,
152    Gauge,
153}
154
155impl MetricData {
156    pub fn new(
157        name: String,
158        value: f64,
159        labels: HashMap<String, String>,
160        metric_type: MetricType,
161    ) -> Self {
162        Self {
163            name,
164            value,
165            labels,
166            timestamp: Instant::now(),
167            metric_type,
168        }
169    }
170
171    fn estimated_size(&self) -> usize {
172        std::mem::size_of::<Self>()
173            + self.name.len()
174            + self
175                .labels
176                .iter()
177                .map(|(k, v)| k.len() + v.len())
178                .sum::<usize>()
179    }
180}
181
182/// Log data for batching
183#[cfg(feature = "structured-logging")]
184#[derive(Debug, Clone)]
185pub struct LogData {
186    pub level: LogLevel,
187    pub message: String,
188    pub fields: JsonValue,
189    pub timestamp: Instant,
190    pub trace_id: Option<String>,
191    pub span_id: Option<String>,
192}
193
194#[cfg(feature = "structured-logging")]
195impl LogData {
196    pub fn new(level: LogLevel, message: String, fields: JsonValue) -> Self {
197        Self {
198            level,
199            message,
200            fields,
201            timestamp: Instant::now(),
202            trace_id: None,
203            span_id: None,
204        }
205    }
206
207    pub fn with_trace_context(mut self, trace_id: String, span_id: String) -> Self {
208        self.trace_id = Some(trace_id);
209        self.span_id = Some(span_id);
210        self
211    }
212
213    fn estimated_size(&self) -> usize {
214        std::mem::size_of::<Self>()
215            + self.message.len()
216            + self.fields.to_string().len() // Approximate JSON size
217            + self.trace_id.as_ref().map_or(0, |s| s.len())
218            + self.span_id.as_ref().map_or(0, |s| s.len())
219    }
220}
221
222/// Memory-efficient buffer with overflow handling
223pub struct MemoryEfficientBuffer<T> {
224    buffer: VecDeque<T>,
225    max_size: usize,
226    current_memory: AtomicUsize,
227    max_memory: usize,
228    dropped_count: AtomicU64,
229    drop_on_overflow: bool,
230}
231
232impl<T> MemoryEfficientBuffer<T>
233where
234    T: Clone,
235{
236    pub fn new(max_size: usize, max_memory: usize, drop_on_overflow: bool) -> Self {
237        Self {
238            buffer: VecDeque::with_capacity(max_size.min(1000)), // Cap initial allocation
239            max_size,
240            current_memory: AtomicUsize::new(0),
241            max_memory,
242            dropped_count: AtomicU64::new(0),
243            drop_on_overflow,
244        }
245    }
246
247    pub fn push(&mut self, item: T) -> bool
248    where
249        T: MemoryEstimator,
250    {
251        let item_size = item.estimated_size();
252
253        // Check memory constraints
254        if self.current_memory.load(Ordering::Relaxed) + item_size > self.max_memory {
255            if self.drop_on_overflow {
256                self.dropped_count.fetch_add(1, Ordering::Relaxed);
257                return false;
258            } else {
259                // Remove oldest items to make room
260                while self.current_memory.load(Ordering::Relaxed) + item_size > self.max_memory
261                    && !self.buffer.is_empty()
262                {
263                    if let Some(old_item) = self.buffer.pop_front() {
264                        let old_size = old_item.estimated_size();
265                        self.current_memory.fetch_sub(old_size, Ordering::Relaxed);
266                    }
267                }
268            }
269        }
270
271        // Check size constraints
272        if self.buffer.len() >= self.max_size {
273            if self.drop_on_overflow {
274                self.dropped_count.fetch_add(1, Ordering::Relaxed);
275                return false;
276            } else if let Some(old_item) = self.buffer.pop_front() {
277                let old_size = old_item.estimated_size();
278                self.current_memory.fetch_sub(old_size, Ordering::Relaxed);
279            }
280        }
281
282        self.buffer.push_back(item);
283        self.current_memory.fetch_add(item_size, Ordering::Relaxed);
284        true
285    }
286
287    pub fn drain(&mut self) -> Vec<T> {
288        let items: Vec<T> = self.buffer.drain(..).collect();
289        self.current_memory.store(0, Ordering::Relaxed);
290        items
291    }
292
293    pub fn len(&self) -> usize {
294        self.buffer.len()
295    }
296
297    pub fn is_empty(&self) -> bool {
298        self.buffer.is_empty()
299    }
300
301    pub fn dropped_count(&self) -> u64 {
302        self.dropped_count.load(Ordering::Relaxed)
303    }
304
305    pub fn memory_usage(&self) -> usize {
306        self.current_memory.load(Ordering::Relaxed)
307    }
308}
309
310/// Trait for estimating memory usage
311pub trait MemoryEstimator {
312    fn estimated_size(&self) -> usize;
313}
314
315impl MemoryEstimator for TelemetryData {
316    fn estimated_size(&self) -> usize {
317        self.estimated_size()
318    }
319}
320
321/// Batching manager for telemetry data
322pub struct BatchingManager {
323    config: BatchingConfig,
324    span_buffer: Arc<Mutex<MemoryEfficientBuffer<SpanData>>>,
325    metric_buffer: Arc<Mutex<MemoryEfficientBuffer<MetricData>>>,
326    #[cfg(feature = "structured-logging")]
327    log_buffer: Arc<Mutex<MemoryEfficientBuffer<LogData>>>,
328    last_flush: Arc<RwLock<Instant>>,
329    flush_callback: Arc<
330        Mutex<Option<Box<dyn Fn(Vec<TelemetryData>) -> ObservabilityResult<()> + Send + Sync>>>,
331    >,
332}
333
334impl BatchingManager {
335    pub fn new(config: BatchingConfig) -> Self {
336        let buffer_size = config.max_batch_size;
337        let memory_per_buffer = config.max_memory_bytes / 3; // Divide among span, metric, log buffers
338
339        Self {
340            span_buffer: Arc::new(Mutex::new(MemoryEfficientBuffer::new(
341                buffer_size,
342                memory_per_buffer,
343                config.drop_on_overflow,
344            ))),
345            metric_buffer: Arc::new(Mutex::new(MemoryEfficientBuffer::new(
346                buffer_size,
347                memory_per_buffer,
348                config.drop_on_overflow,
349            ))),
350            #[cfg(feature = "structured-logging")]
351            log_buffer: Arc::new(Mutex::new(MemoryEfficientBuffer::new(
352                buffer_size,
353                memory_per_buffer,
354                config.drop_on_overflow,
355            ))),
356            last_flush: Arc::new(RwLock::new(Instant::now())),
357            flush_callback: Arc::new(Mutex::new(None)),
358            config,
359        }
360    }
361
362    /// Set the flush callback function
363    pub fn set_flush_callback<F>(&mut self, callback: F)
364    where
365        F: Fn(Vec<TelemetryData>) -> ObservabilityResult<()> + Send + Sync + 'static,
366    {
367        let mut cb = self.flush_callback.lock().unwrap();
368        *cb = Some(Box::new(callback));
369    }
370
371    /// Add a span to the batch
372    pub fn add_span(&self, span: SpanData) -> ObservabilityResult<()> {
373        let mut buffer = self
374            .span_buffer
375            .lock()
376            .map_err(|_| ObservabilityError::batching("Failed to acquire span buffer lock"))?;
377
378        if !buffer.push(span) {
379            return Err(ObservabilityError::buffer("Span buffer overflow"));
380        }
381
382        // Check if we should flush
383        self.check_and_flush()?;
384        Ok(())
385    }
386
387    /// Add a metric to the batch
388    pub fn add_metric(&self, metric: MetricData) -> ObservabilityResult<()> {
389        let mut buffer = self
390            .metric_buffer
391            .lock()
392            .map_err(|_| ObservabilityError::batching("Failed to acquire metric buffer lock"))?;
393
394        if !buffer.push(metric) {
395            return Err(ObservabilityError::buffer("Metric buffer overflow"));
396        }
397
398        // Check if we should flush
399        self.check_and_flush()?;
400        Ok(())
401    }
402
403    /// Add a log to the batch
404    #[cfg(feature = "structured-logging")]
405    pub fn add_log(&self, log: LogData) -> ObservabilityResult<()> {
406        let mut buffer = self
407            .log_buffer
408            .lock()
409            .map_err(|_| ObservabilityError::batching("Failed to acquire log buffer lock"))?;
410
411        if !buffer.push(log) {
412            return Err(ObservabilityError::buffer("Log buffer overflow"));
413        }
414
415        // Check if we should flush
416        self.check_and_flush()?;
417        Ok(())
418    }
419
420    /// Check if buffers should be flushed and flush if necessary
421    fn check_and_flush(&self) -> ObservabilityResult<()> {
422        let should_flush = {
423            let last_flush = self.last_flush.read().unwrap();
424            let elapsed = last_flush.elapsed();
425
426            // Check time-based flush
427            if elapsed >= self.config.flush_interval {
428                true
429            } else {
430                // Check size-based flush
431                let span_len = self.span_buffer.lock().unwrap().len();
432                let metric_len = self.metric_buffer.lock().unwrap().len();
433                #[cfg(feature = "structured-logging")]
434                let log_len = self.log_buffer.lock().unwrap().len();
435                #[cfg(not(feature = "structured-logging"))]
436                let log_len = 0;
437
438                let total_items = span_len + metric_len + log_len;
439                total_items >= self.config.min_batch_size
440            }
441        };
442
443        if should_flush {
444            self.flush_all()?;
445        }
446
447        Ok(())
448    }
449
450    /// Force flush all buffers
451    pub fn flush_all(&self) -> ObservabilityResult<()> {
452        let mut all_data = Vec::new();
453
454        // Drain all buffers
455        {
456            let mut span_buffer = self.span_buffer.lock().unwrap();
457            let spans = span_buffer.drain();
458            all_data.extend(spans.into_iter().map(TelemetryData::Span));
459        }
460
461        {
462            let mut metric_buffer = self.metric_buffer.lock().unwrap();
463            let metrics = metric_buffer.drain();
464            all_data.extend(metrics.into_iter().map(TelemetryData::Metric));
465        }
466
467        #[cfg(feature = "structured-logging")]
468        {
469            let mut log_buffer = self.log_buffer.lock().unwrap();
470            let logs = log_buffer.drain();
471            all_data.extend(logs.into_iter().map(TelemetryData::Log));
472        }
473
474        // Update last flush time
475        {
476            let mut last_flush = self.last_flush.write().unwrap();
477            *last_flush = Instant::now();
478        }
479
480        // Call flush callback if data exists
481        if !all_data.is_empty() {
482            if let Some(callback) = self.flush_callback.lock().unwrap().as_ref() {
483                callback(all_data)?;
484            }
485        }
486
487        Ok(())
488    }
489
490    /// Get buffer statistics
491    pub fn get_stats(&self) -> BatchingStats {
492        let span_buffer = self.span_buffer.lock().unwrap();
493        let metric_buffer = self.metric_buffer.lock().unwrap();
494
495        #[cfg(feature = "structured-logging")]
496        let log_buffer = self.log_buffer.lock().unwrap();
497        #[cfg(not(feature = "structured-logging"))]
498        let log_buffer_len = 0;
499        #[cfg(not(feature = "structured-logging"))]
500        let log_dropped = 0;
501        #[cfg(not(feature = "structured-logging"))]
502        let log_memory = 0;
503
504        BatchingStats {
505            span_count: span_buffer.len(),
506            metric_count: metric_buffer.len(),
507            #[cfg(feature = "structured-logging")]
508            log_count: log_buffer.len(),
509            #[cfg(not(feature = "structured-logging"))]
510            log_count: log_buffer_len,
511            span_dropped: span_buffer.dropped_count(),
512            metric_dropped: metric_buffer.dropped_count(),
513            #[cfg(feature = "structured-logging")]
514            log_dropped: log_buffer.dropped_count(),
515            #[cfg(not(feature = "structured-logging"))]
516            log_dropped,
517            memory_usage: {
518                let mut total = span_buffer.memory_usage() + metric_buffer.memory_usage();
519                #[cfg(feature = "structured-logging")]
520                {
521                    total += log_buffer.memory_usage();
522                }
523                #[cfg(not(feature = "structured-logging"))]
524                {
525                    total += log_memory;
526                }
527                total
528            },
529            last_flush: *self.last_flush.read().unwrap(),
530        }
531    }
532}
533
534/// Statistics about batching buffers
535#[derive(Debug, Clone)]
536pub struct BatchingStats {
537    pub span_count: usize,
538    pub metric_count: usize,
539    pub log_count: usize,
540    pub span_dropped: u64,
541    pub metric_dropped: u64,
542    pub log_dropped: u64,
543    pub memory_usage: usize,
544    pub last_flush: Instant,
545}
546
547impl BatchingStats {
548    pub fn total_items(&self) -> usize {
549        self.span_count + self.metric_count + self.log_count
550    }
551
552    pub fn total_dropped(&self) -> u64 {
553        self.span_dropped + self.metric_dropped + self.log_dropped
554    }
555}
556
557#[cfg(test)]
558mod tests {
559    use super::*;
560
561    #[test]
562    fn test_memory_efficient_buffer() {
563        let mut buffer = MemoryEfficientBuffer::new(3, 1000, true);
564
565        let data1 = MetricData::new(
566            "test1".to_string(),
567            1.0,
568            HashMap::new(),
569            MetricType::Counter,
570        );
571        let data2 = MetricData::new(
572            "test2".to_string(),
573            2.0,
574            HashMap::new(),
575            MetricType::Counter,
576        );
577        let data3 = MetricData::new(
578            "test3".to_string(),
579            3.0,
580            HashMap::new(),
581            MetricType::Counter,
582        );
583        let data4 = MetricData::new(
584            "test4".to_string(),
585            4.0,
586            HashMap::new(),
587            MetricType::Counter,
588        );
589
590        assert!(buffer.push(data1));
591        assert!(buffer.push(data2));
592        assert!(buffer.push(data3));
593
594        // Should drop when buffer is full
595        assert!(!buffer.push(data4));
596        assert_eq!(buffer.dropped_count(), 1);
597        assert_eq!(buffer.len(), 3);
598    }
599
600    #[test]
601    fn test_batching_manager() {
602        let config = BatchingConfig {
603            max_batch_size: 5,
604            min_batch_size: 10, // Set high to avoid auto-flush
605            flush_interval: Duration::from_hours(1), // Set very high to avoid time-based flush
606            ..Default::default()
607        };
608
609        let manager = BatchingManager::new(config);
610
611        // Test that we can get stats without adding anything
612        let stats = manager.get_stats();
613        assert_eq!(stats.metric_count, 0);
614        assert_eq!(stats.total_items(), 0);
615
616        // Test stats calculation methods
617        assert_eq!(stats.total_dropped(), 0);
618    }
619
620    #[test]
621    fn test_span_data_creation() {
622        let span = SpanData::new(
623            "span1".to_string(),
624            "trace1".to_string(),
625            "test_span".to_string(),
626        )
627        .with_parent("parent1".to_string());
628
629        assert_eq!(span.span_id, "span1");
630        assert_eq!(span.trace_id, "trace1");
631        assert_eq!(span.parent_span_id, Some("parent1".to_string()));
632        assert_eq!(span.name, "test_span");
633    }
634
635    #[test]
636    fn test_metric_data_creation() {
637        let mut labels = HashMap::new();
638        labels.insert("env".to_string(), "test".to_string());
639
640        let metric = MetricData::new(
641            "test_metric".to_string(),
642            42.0,
643            labels.clone(),
644            MetricType::Gauge,
645        );
646
647        assert_eq!(metric.name, "test_metric");
648        assert_eq!(metric.value, 42.0);
649        assert_eq!(metric.labels, labels);
650        assert!(matches!(metric.metric_type, MetricType::Gauge));
651    }
652
653    #[test]
654    fn test_batching_config() {
655        let config = BatchingConfig::default();
656        assert_eq!(config.max_batch_size, 100);
657        assert_eq!(config.flush_interval, Duration::from_secs(5));
658        assert_eq!(config.max_memory_bytes, 1024 * 1024);
659        assert!(config.drop_on_overflow);
660        assert_eq!(config.min_batch_size, 10);
661    }
662
663    #[test]
664    fn test_telemetry_data_size_estimation() {
665        let span = SpanData::new("test".to_string(), "trace".to_string(), "span".to_string());
666        let span_data = TelemetryData::Span(span);
667        assert!(span_data.estimated_size() > 0);
668
669        let metric = MetricData::new("test".to_string(), 1.0, HashMap::new(), MetricType::Counter);
670        let metric_data = TelemetryData::Metric(metric);
671        assert!(metric_data.estimated_size() > 0);
672    }
673}