memscope_rs/unified/
tracking_dispatcher.rs

1// Tracking Strategy Dispatcher
2// Intelligently routes memory tracking requests to optimal tracking implementations
3// Manages lifecycle of different tracking strategies and data aggregation
4
5use crate::lockfree::aggregator::LockfreeAggregator;
6use crate::unified::backend::{BackendError, RuntimeEnvironment, TrackingStrategy};
7use std::collections::HashMap;
8use std::sync::Arc;
9use thiserror::Error;
10use tracing::{debug, error, info};
11
12/// Central dispatcher that routes tracking operations to appropriate implementations
13/// Maintains active tracking strategies and coordinates data collection
14pub struct TrackingDispatcher {
15    /// Currently active tracking strategy
16    active_strategy: Option<TrackingStrategy>,
17    /// Strategy-specific tracker implementations
18    tracker_registry: TrackerRegistry,
19    /// Shared data aggregator for all tracking sources
20    #[allow(dead_code)]
21    aggregator: Arc<LockfreeAggregator>,
22    /// Dispatcher configuration
23    #[allow(dead_code)]
24    config: DispatcherConfig,
25    /// Performance metrics
26    metrics: DispatcherMetrics,
27}
28
29/// Registry of available tracker implementations
30/// Maintains strategy-specific trackers and their lifecycle
31struct TrackerRegistry {
32    /// Single-threaded tracker instance
33    single_thread_tracker: Option<Box<dyn MemoryTracker>>,
34    /// Multi-threaded tracker instance
35    multi_thread_tracker: Option<Box<dyn MemoryTracker>>,
36    /// Async tracker instance
37    async_tracker: Option<Box<dyn MemoryTracker>>,
38    /// Hybrid tracker instance
39    hybrid_tracker: Option<Box<dyn MemoryTracker>>,
40}
41
42/// Configuration for dispatcher behavior
43#[derive(Debug, Clone)]
44pub struct DispatcherConfig {
45    /// Enable automatic strategy switching
46    pub auto_switch_strategies: bool,
47    /// Maximum number of concurrent trackers
48    pub max_concurrent_trackers: usize,
49    /// Performance monitoring interval
50    pub metrics_interval_ms: u64,
51    /// Memory usage threshold for strategy switching
52    pub memory_threshold_mb: usize,
53}
54
55/// Performance metrics for dispatcher operations
56#[derive(Debug, Clone)]
57pub struct DispatcherMetrics {
58    /// Total tracking operations dispatched
59    pub total_dispatches: u64,
60    /// Strategy switch count
61    pub strategy_switches: u64,
62    /// Average dispatch latency in microseconds
63    pub avg_dispatch_latency_us: f64,
64    /// Current memory overhead percentage
65    pub memory_overhead_percent: f64,
66    /// Active tracker count
67    pub active_trackers: usize,
68}
69
70/// Unified memory tracker interface
71/// All tracking implementations must implement this trait for dispatcher compatibility
72pub trait MemoryTracker: Send + Sync {
73    /// Initialize tracker with given configuration
74    fn initialize(&mut self, config: TrackerConfig) -> Result<(), TrackerError>;
75
76    /// Start tracking memory operations
77    fn start_tracking(&mut self) -> Result<(), TrackerError>;
78
79    /// Stop tracking and collect data
80    fn stop_tracking(&mut self) -> Result<Vec<u8>, TrackerError>;
81
82    /// Get current tracking statistics
83    fn get_statistics(&self) -> TrackerStatistics;
84
85    /// Check if tracker is currently active
86    fn is_active(&self) -> bool;
87
88    /// Get tracker type identifier
89    fn tracker_type(&self) -> TrackerType;
90}
91
92/// Configuration for individual tracker instances
93#[derive(Debug, Clone)]
94pub struct TrackerConfig {
95    /// Sampling rate (0.0 to 1.0)
96    pub sample_rate: f64,
97    /// Maximum memory overhead allowed
98    pub max_overhead_mb: usize,
99    /// Thread affinity settings
100    pub thread_affinity: Option<Vec<usize>>,
101    /// Custom tracking parameters
102    pub custom_params: HashMap<String, String>,
103}
104
105/// Statistics from individual tracker
106#[derive(Debug, Clone)]
107pub struct TrackerStatistics {
108    /// Allocations tracked by this tracker
109    pub allocations_tracked: u64,
110    /// Memory currently tracked (bytes)
111    pub memory_tracked_bytes: u64,
112    /// Tracker overhead (bytes)
113    pub overhead_bytes: u64,
114    /// Tracking duration (milliseconds)
115    pub tracking_duration_ms: u64,
116}
117
118/// Tracker type identifiers
119#[derive(Debug, Clone, PartialEq)]
120pub enum TrackerType {
121    /// Single-threaded global tracker
122    SingleThread,
123    /// Multi-threaded with thread-local storage
124    MultiThread,
125    /// Async-aware task-local tracker
126    AsyncTracker,
127    /// Hybrid multi-thread + async tracker
128    HybridTracker,
129}
130
131/// Errors specific to tracker operations
132#[derive(Error, Debug)]
133pub enum TrackerError {
134    /// Tracker initialization failed
135    #[error("Tracker initialization failed: {reason}")]
136    InitializationFailed { reason: String },
137
138    /// Tracking start failed
139    #[error("Failed to start tracking: {reason}")]
140    StartFailed { reason: String },
141
142    /// Data collection failed
143    #[error("Failed to collect tracking data: {reason}")]
144    DataCollectionFailed { reason: String },
145
146    /// Tracker configuration invalid
147    #[error("Invalid tracker configuration: {reason}")]
148    InvalidConfiguration { reason: String },
149
150    /// JSON serialization/deserialization error
151    #[error("JSON processing error: {source}")]
152    JsonError {
153        #[from]
154        source: serde_json::Error,
155    },
156}
157
158impl Default for DispatcherConfig {
159    /// Default dispatcher configuration optimized for most use cases
160    fn default() -> Self {
161        Self {
162            auto_switch_strategies: true,
163            max_concurrent_trackers: 4,
164            metrics_interval_ms: 1000,
165            memory_threshold_mb: 100,
166        }
167    }
168}
169
170impl Default for DispatcherMetrics {
171    /// Initialize metrics with zero values
172    fn default() -> Self {
173        Self {
174            total_dispatches: 0,
175            strategy_switches: 0,
176            avg_dispatch_latency_us: 0.0,
177            memory_overhead_percent: 0.0,
178            active_trackers: 0,
179        }
180    }
181}
182
183impl Default for TrackerConfig {
184    /// Default tracker configuration
185    fn default() -> Self {
186        Self {
187            sample_rate: 1.0,
188            max_overhead_mb: 50,
189            thread_affinity: None,
190            custom_params: HashMap::new(),
191        }
192    }
193}
194
195impl TrackerRegistry {
196    /// Create new empty tracker registry
197    fn new() -> Self {
198        Self {
199            single_thread_tracker: None,
200            multi_thread_tracker: None,
201            async_tracker: None,
202            hybrid_tracker: None,
203        }
204    }
205
206    /// Get tracker for specified type, creating if necessary
207    fn get_or_create_tracker(
208        &mut self,
209        tracker_type: TrackerType,
210    ) -> Result<&mut Box<dyn MemoryTracker>, BackendError> {
211        let tracker = match tracker_type {
212            TrackerType::SingleThread => &mut self.single_thread_tracker,
213            TrackerType::MultiThread => &mut self.multi_thread_tracker,
214            TrackerType::AsyncTracker => &mut self.async_tracker,
215            TrackerType::HybridTracker => &mut self.hybrid_tracker,
216        };
217
218        if tracker.is_none() {
219            *tracker = Some(Self::create_tracker(&tracker_type)?);
220        }
221
222        tracker
223            .as_mut()
224            .ok_or_else(|| BackendError::TrackingInitializationFailed {
225                reason: format!("Failed to create {:?} tracker", tracker_type),
226            })
227    }
228
229    /// Create new tracker instance of specified type
230    fn create_tracker(tracker_type: &TrackerType) -> Result<Box<dyn MemoryTracker>, BackendError> {
231        match tracker_type {
232            TrackerType::SingleThread => Ok(Box::new(SingleThreadTracker::new())),
233            TrackerType::MultiThread => Ok(Box::new(MultiThreadTracker::new())),
234            TrackerType::AsyncTracker => Ok(Box::new(AsyncTrackerWrapper::new())),
235            TrackerType::HybridTracker => Ok(Box::new(HybridTrackerImpl::new())),
236        }
237    }
238
239    /// Count currently active trackers
240    fn count_active_trackers(&self) -> usize {
241        let mut count = 0;
242        if let Some(ref tracker) = self.single_thread_tracker {
243            if tracker.is_active() {
244                count += 1;
245            }
246        }
247        if let Some(ref tracker) = self.multi_thread_tracker {
248            if tracker.is_active() {
249                count += 1;
250            }
251        }
252        if let Some(ref tracker) = self.async_tracker {
253            if tracker.is_active() {
254                count += 1;
255            }
256        }
257        if let Some(ref tracker) = self.hybrid_tracker {
258            if tracker.is_active() {
259                count += 1;
260            }
261        }
262        count
263    }
264}
265
266impl TrackingDispatcher {
267    /// Create new tracking dispatcher with configuration
268    pub fn new(config: DispatcherConfig) -> Self {
269        info!("Creating tracking dispatcher with config: {:?}", config);
270
271        Self {
272            active_strategy: None,
273            tracker_registry: TrackerRegistry::new(),
274            aggregator: Arc::new(LockfreeAggregator::new(
275                std::env::temp_dir().join("memscope_dispatcher"),
276            )),
277            config,
278            metrics: DispatcherMetrics::default(),
279        }
280    }
281
282    /// Select and activate optimal tracking strategy for given environment
283    pub fn select_strategy(
284        &mut self,
285        environment: &RuntimeEnvironment,
286    ) -> Result<TrackingStrategy, BackendError> {
287        debug!("Selecting strategy for environment: {:?}", environment);
288
289        let strategy = match environment {
290            RuntimeEnvironment::SingleThreaded => TrackingStrategy::GlobalDirect,
291            RuntimeEnvironment::MultiThreaded { thread_count } => {
292                if *thread_count <= 2 {
293                    TrackingStrategy::GlobalDirect
294                } else {
295                    TrackingStrategy::ThreadLocal
296                }
297            }
298            RuntimeEnvironment::AsyncRuntime { .. } => TrackingStrategy::TaskLocal,
299            RuntimeEnvironment::Hybrid {
300                thread_count,
301                async_task_count,
302            } => {
303                if *thread_count > 1 && *async_task_count > 0 {
304                    TrackingStrategy::HybridTracking
305                } else if *async_task_count > 0 {
306                    TrackingStrategy::TaskLocal
307                } else {
308                    TrackingStrategy::ThreadLocal
309                }
310            }
311        };
312
313        info!(
314            "Selected strategy: {:?} for environment: {:?}",
315            strategy, environment
316        );
317        self.active_strategy = Some(strategy.clone());
318
319        Ok(strategy)
320    }
321
322    /// Activate tracking with selected strategy
323    pub fn activate_tracking(&mut self, strategy: TrackingStrategy) -> Result<(), BackendError> {
324        info!("Activating tracking with strategy: {:?}", strategy);
325
326        let tracker_type = self.strategy_to_tracker_type(&strategy);
327        let tracker = self.tracker_registry.get_or_create_tracker(tracker_type)?;
328
329        // Configure tracker
330        let tracker_config = TrackerConfig::default();
331        tracker.initialize(tracker_config).map_err(|e| {
332            BackendError::TrackingInitializationFailed {
333                reason: format!("Tracker initialization failed: {}", e),
334            }
335        })?;
336
337        // Start tracking
338        tracker
339            .start_tracking()
340            .map_err(|e| BackendError::TrackingInitializationFailed {
341                reason: format!("Failed to start tracking: {}", e),
342            })?;
343
344        self.active_strategy = Some(strategy);
345        self.metrics.active_trackers = self.tracker_registry.count_active_trackers();
346        self.metrics.strategy_switches += 1;
347
348        info!("Tracking activated successfully");
349        Ok(())
350    }
351
352    /// Dispatch tracking operation to active tracker
353    pub fn dispatch_tracking_operation(
354        &mut self,
355        operation: TrackingOperation,
356    ) -> Result<(), BackendError> {
357        let start_time = std::time::Instant::now();
358
359        let strategy = self
360            .active_strategy
361            .as_ref()
362            .ok_or_else(|| BackendError::TrackingInitializationFailed {
363                reason: "No active tracking strategy".to_string(),
364            })?
365            .clone();
366
367        let tracker_type = self.strategy_to_tracker_type(&strategy);
368
369        // Split borrow to avoid conflict
370        {
371            let tracker = self.tracker_registry.get_or_create_tracker(tracker_type)?;
372            // Execute operation
373            Self::execute_operation_static(tracker, operation)?;
374        }
375
376        // Update metrics
377        let dispatch_time = start_time.elapsed().as_micros() as f64;
378        self.update_dispatch_metrics(dispatch_time);
379
380        Ok(())
381    }
382
383    /// Execute tracking operation on specified tracker
384    fn execute_operation_static(
385        tracker: &mut Box<dyn MemoryTracker>,
386        operation: TrackingOperation,
387    ) -> Result<(), BackendError> {
388        match operation {
389            TrackingOperation::StartTracking => {
390                if !tracker.is_active() {
391                    tracker.start_tracking().map_err(|e| {
392                        BackendError::TrackingInitializationFailed {
393                            reason: format!("Failed to start tracking: {}", e),
394                        }
395                    })?;
396                }
397            }
398            TrackingOperation::StopTracking => {
399                if tracker.is_active() {
400                    let _data =
401                        tracker
402                            .stop_tracking()
403                            .map_err(|e| BackendError::DataCollectionError {
404                                reason: format!("Failed to stop tracking: {}", e),
405                            })?;
406                    // Data would be processed here
407                }
408            }
409            TrackingOperation::CollectData => {
410                let stats = tracker.get_statistics();
411                debug!("Collected statistics: {:?}", stats);
412            }
413        }
414
415        Ok(())
416    }
417
418    /// Convert tracking strategy to tracker type
419    fn strategy_to_tracker_type(&self, strategy: &TrackingStrategy) -> TrackerType {
420        match strategy {
421            TrackingStrategy::GlobalDirect => TrackerType::SingleThread,
422            TrackingStrategy::ThreadLocal => TrackerType::MultiThread,
423            TrackingStrategy::TaskLocal => TrackerType::AsyncTracker,
424            TrackingStrategy::HybridTracking => TrackerType::HybridTracker,
425        }
426    }
427
428    /// Update dispatch performance metrics
429    fn update_dispatch_metrics(&mut self, dispatch_time_us: f64) {
430        self.metrics.total_dispatches += 1;
431
432        // Update running average of dispatch latency
433        let weight = 0.1; // Exponential moving average weight
434        self.metrics.avg_dispatch_latency_us =
435            (1.0 - weight) * self.metrics.avg_dispatch_latency_us + weight * dispatch_time_us;
436    }
437
438    /// Collect data from all active trackers
439    pub fn collect_all_data(&mut self) -> Result<Vec<u8>, BackendError> {
440        debug!("Collecting data from all active trackers");
441
442        let mut all_data = Vec::new();
443
444        // Collect from each active tracker
445        if let Some(ref mut tracker) = self.tracker_registry.single_thread_tracker {
446            if tracker.is_active() {
447                let data =
448                    tracker
449                        .stop_tracking()
450                        .map_err(|e| BackendError::DataCollectionError {
451                            reason: format!("Single thread tracker data collection failed: {}", e),
452                        })?;
453                all_data.extend(data);
454            }
455        }
456
457        if let Some(ref mut tracker) = self.tracker_registry.multi_thread_tracker {
458            if tracker.is_active() {
459                let data =
460                    tracker
461                        .stop_tracking()
462                        .map_err(|e| BackendError::DataCollectionError {
463                            reason: format!("Multi thread tracker data collection failed: {}", e),
464                        })?;
465                all_data.extend(data);
466            }
467        }
468
469        if let Some(ref mut tracker) = self.tracker_registry.async_tracker {
470            if tracker.is_active() {
471                let data =
472                    tracker
473                        .stop_tracking()
474                        .map_err(|e| BackendError::DataCollectionError {
475                            reason: format!("Async tracker data collection failed: {}", e),
476                        })?;
477                all_data.extend(data);
478            }
479        }
480
481        if let Some(ref mut tracker) = self.tracker_registry.hybrid_tracker {
482            if tracker.is_active() {
483                let data =
484                    tracker
485                        .stop_tracking()
486                        .map_err(|e| BackendError::DataCollectionError {
487                            reason: format!("Hybrid tracker data collection failed: {}", e),
488                        })?;
489                all_data.extend(data);
490            }
491        }
492
493        info!("Collected {} bytes of tracking data", all_data.len());
494        Ok(all_data)
495    }
496
497    /// Get current dispatcher metrics
498    pub fn get_metrics(&self) -> &DispatcherMetrics {
499        &self.metrics
500    }
501
502    /// Shutdown dispatcher and cleanup all trackers
503    pub fn shutdown(mut self) -> Result<Vec<u8>, BackendError> {
504        info!("Shutting down tracking dispatcher");
505
506        let final_data = self.collect_all_data()?;
507
508        debug!("Dispatcher shutdown completed");
509        Ok(final_data)
510    }
511}
512
513/// Tracking operations that can be dispatched
514#[derive(Debug, Clone)]
515pub enum TrackingOperation {
516    /// Start tracking operation
517    StartTracking,
518    /// Stop tracking operation
519    StopTracking,
520    /// Collect current data
521    CollectData,
522}
523
524// Placeholder tracker implementations
525// These would be replaced with actual implementations
526
527/// Single-threaded tracker implementation
528struct SingleThreadTracker {
529    active: bool,
530    allocations: u64,
531}
532
533impl SingleThreadTracker {
534    fn new() -> Self {
535        Self {
536            active: false,
537            allocations: 0,
538        }
539    }
540}
541
542impl MemoryTracker for SingleThreadTracker {
543    fn initialize(&mut self, _config: TrackerConfig) -> Result<(), TrackerError> {
544        debug!("Initializing single thread tracker");
545        Ok(())
546    }
547
548    fn start_tracking(&mut self) -> Result<(), TrackerError> {
549        debug!("Starting single thread tracking");
550        self.active = true;
551        Ok(())
552    }
553
554    fn stop_tracking(&mut self) -> Result<Vec<u8>, TrackerError> {
555        debug!("Stopping single thread tracking");
556        self.active = false;
557        Ok(vec![]) // Placeholder data
558    }
559
560    fn get_statistics(&self) -> TrackerStatistics {
561        TrackerStatistics {
562            allocations_tracked: self.allocations,
563            memory_tracked_bytes: 0,
564            overhead_bytes: 0,
565            tracking_duration_ms: 0,
566        }
567    }
568
569    fn is_active(&self) -> bool {
570        self.active
571    }
572
573    fn tracker_type(&self) -> TrackerType {
574        TrackerType::SingleThread
575    }
576}
577
578/// Multi-threaded tracker implementation
579struct MultiThreadTracker {
580    active: bool,
581}
582
583impl MultiThreadTracker {
584    fn new() -> Self {
585        Self { active: false }
586    }
587}
588
589impl MemoryTracker for MultiThreadTracker {
590    fn initialize(&mut self, _config: TrackerConfig) -> Result<(), TrackerError> {
591        debug!("Initializing multi thread tracker");
592        Ok(())
593    }
594
595    fn start_tracking(&mut self) -> Result<(), TrackerError> {
596        debug!("Starting multi thread tracking");
597        self.active = true;
598        Ok(())
599    }
600
601    fn stop_tracking(&mut self) -> Result<Vec<u8>, TrackerError> {
602        debug!("Stopping multi thread tracking");
603        self.active = false;
604        Ok(vec![])
605    }
606
607    fn get_statistics(&self) -> TrackerStatistics {
608        TrackerStatistics {
609            allocations_tracked: 0,
610            memory_tracked_bytes: 0,
611            overhead_bytes: 0,
612            tracking_duration_ms: 0,
613        }
614    }
615
616    fn is_active(&self) -> bool {
617        self.active
618    }
619
620    fn tracker_type(&self) -> TrackerType {
621        TrackerType::MultiThread
622    }
623}
624
625/// Async tracker wrapper
626struct AsyncTrackerWrapper {
627    active: bool,
628}
629
630impl AsyncTrackerWrapper {
631    fn new() -> Self {
632        Self { active: false }
633    }
634}
635
636impl MemoryTracker for AsyncTrackerWrapper {
637    fn initialize(&mut self, _config: TrackerConfig) -> Result<(), TrackerError> {
638        debug!("Initializing async tracker wrapper");
639        Ok(())
640    }
641
642    fn start_tracking(&mut self) -> Result<(), TrackerError> {
643        debug!("Starting async tracking");
644        self.active = true;
645        Ok(())
646    }
647
648    fn stop_tracking(&mut self) -> Result<Vec<u8>, TrackerError> {
649        debug!("Stopping async tracking");
650        self.active = false;
651        Ok(vec![])
652    }
653
654    fn get_statistics(&self) -> TrackerStatistics {
655        TrackerStatistics {
656            allocations_tracked: 0,
657            memory_tracked_bytes: 0,
658            overhead_bytes: 0,
659            tracking_duration_ms: 0,
660        }
661    }
662
663    fn is_active(&self) -> bool {
664        self.active
665    }
666
667    fn tracker_type(&self) -> TrackerType {
668        TrackerType::AsyncTracker
669    }
670}
671
672/// Hybrid tracker implementation
673struct HybridTrackerImpl {
674    active: bool,
675}
676
677impl HybridTrackerImpl {
678    fn new() -> Self {
679        Self { active: false }
680    }
681}
682
683impl MemoryTracker for HybridTrackerImpl {
684    fn initialize(&mut self, _config: TrackerConfig) -> Result<(), TrackerError> {
685        debug!("Initializing hybrid tracker");
686        Ok(())
687    }
688
689    fn start_tracking(&mut self) -> Result<(), TrackerError> {
690        debug!("Starting hybrid tracking");
691        self.active = true;
692        Ok(())
693    }
694
695    fn stop_tracking(&mut self) -> Result<Vec<u8>, TrackerError> {
696        debug!("Stopping hybrid tracking");
697        self.active = false;
698        Ok(vec![])
699    }
700
701    fn get_statistics(&self) -> TrackerStatistics {
702        TrackerStatistics {
703            allocations_tracked: 0,
704            memory_tracked_bytes: 0,
705            overhead_bytes: 0,
706            tracking_duration_ms: 0,
707        }
708    }
709
710    fn is_active(&self) -> bool {
711        self.active
712    }
713
714    fn tracker_type(&self) -> TrackerType {
715        TrackerType::HybridTracker
716    }
717}
718
719#[cfg(test)]
720mod tests {
721    use super::*;
722
723    #[test]
724    fn test_dispatcher_creation() {
725        let config = DispatcherConfig::default();
726        let dispatcher = TrackingDispatcher::new(config);
727        assert!(dispatcher.active_strategy.is_none());
728    }
729
730    #[test]
731    fn test_strategy_selection() {
732        let config = DispatcherConfig::default();
733        let mut dispatcher = TrackingDispatcher::new(config);
734
735        let env = RuntimeEnvironment::SingleThreaded;
736        let strategy = dispatcher.select_strategy(&env);
737
738        assert!(strategy.is_ok());
739        assert_eq!(strategy.unwrap(), TrackingStrategy::GlobalDirect);
740    }
741
742    #[test]
743    fn test_tracker_registry() {
744        let mut registry = TrackerRegistry::new();
745        let tracker = registry.get_or_create_tracker(TrackerType::SingleThread);
746        assert!(tracker.is_ok());
747    }
748
749    #[test]
750    fn test_single_thread_tracker() {
751        let mut tracker = SingleThreadTracker::new();
752        assert!(!tracker.is_active());
753
754        let result = tracker.start_tracking();
755        assert!(result.is_ok());
756        assert!(tracker.is_active());
757
758        let data = tracker.stop_tracking();
759        assert!(data.is_ok());
760        assert!(!tracker.is_active());
761    }
762}