1use crate::lockfree::aggregator::LockfreeAggregator;
6use crate::unified::backend::{BackendError, RuntimeEnvironment, TrackingStrategy};
7use std::collections::HashMap;
8use std::sync::Arc;
9use thiserror::Error;
10use tracing::{debug, error, info};
11
12pub struct TrackingDispatcher {
15 active_strategy: Option<TrackingStrategy>,
17 tracker_registry: TrackerRegistry,
19 #[allow(dead_code)]
21 aggregator: Arc<LockfreeAggregator>,
22 #[allow(dead_code)]
24 config: DispatcherConfig,
25 metrics: DispatcherMetrics,
27}
28
29struct TrackerRegistry {
32 single_thread_tracker: Option<Box<dyn MemoryTracker>>,
34 multi_thread_tracker: Option<Box<dyn MemoryTracker>>,
36 async_tracker: Option<Box<dyn MemoryTracker>>,
38 hybrid_tracker: Option<Box<dyn MemoryTracker>>,
40}
41
42#[derive(Debug, Clone)]
44pub struct DispatcherConfig {
45 pub auto_switch_strategies: bool,
47 pub max_concurrent_trackers: usize,
49 pub metrics_interval_ms: u64,
51 pub memory_threshold_mb: usize,
53}
54
55#[derive(Debug, Clone)]
57pub struct DispatcherMetrics {
58 pub total_dispatches: u64,
60 pub strategy_switches: u64,
62 pub avg_dispatch_latency_us: f64,
64 pub memory_overhead_percent: f64,
66 pub active_trackers: usize,
68}
69
70pub trait MemoryTracker: Send + Sync {
73 fn initialize(&mut self, config: TrackerConfig) -> Result<(), TrackerError>;
75
76 fn start_tracking(&mut self) -> Result<(), TrackerError>;
78
79 fn stop_tracking(&mut self) -> Result<Vec<u8>, TrackerError>;
81
82 fn get_statistics(&self) -> TrackerStatistics;
84
85 fn is_active(&self) -> bool;
87
88 fn tracker_type(&self) -> TrackerType;
90}
91
92#[derive(Debug, Clone)]
94pub struct TrackerConfig {
95 pub sample_rate: f64,
97 pub max_overhead_mb: usize,
99 pub thread_affinity: Option<Vec<usize>>,
101 pub custom_params: HashMap<String, String>,
103}
104
105#[derive(Debug, Clone)]
107pub struct TrackerStatistics {
108 pub allocations_tracked: u64,
110 pub memory_tracked_bytes: u64,
112 pub overhead_bytes: u64,
114 pub tracking_duration_ms: u64,
116}
117
118#[derive(Debug, Clone, PartialEq)]
120pub enum TrackerType {
121 SingleThread,
123 MultiThread,
125 AsyncTracker,
127 HybridTracker,
129}
130
131#[derive(Error, Debug)]
133pub enum TrackerError {
134 #[error("Tracker initialization failed: {reason}")]
136 InitializationFailed { reason: String },
137
138 #[error("Failed to start tracking: {reason}")]
140 StartFailed { reason: String },
141
142 #[error("Failed to collect tracking data: {reason}")]
144 DataCollectionFailed { reason: String },
145
146 #[error("Invalid tracker configuration: {reason}")]
148 InvalidConfiguration { reason: String },
149
150 #[error("JSON processing error: {source}")]
152 JsonError {
153 #[from]
154 source: serde_json::Error,
155 },
156}
157
158impl Default for DispatcherConfig {
159 fn default() -> Self {
161 Self {
162 auto_switch_strategies: true,
163 max_concurrent_trackers: 4,
164 metrics_interval_ms: 1000,
165 memory_threshold_mb: 100,
166 }
167 }
168}
169
170impl Default for DispatcherMetrics {
171 fn default() -> Self {
173 Self {
174 total_dispatches: 0,
175 strategy_switches: 0,
176 avg_dispatch_latency_us: 0.0,
177 memory_overhead_percent: 0.0,
178 active_trackers: 0,
179 }
180 }
181}
182
183impl Default for TrackerConfig {
184 fn default() -> Self {
186 Self {
187 sample_rate: 1.0,
188 max_overhead_mb: 50,
189 thread_affinity: None,
190 custom_params: HashMap::new(),
191 }
192 }
193}
194
195impl TrackerRegistry {
196 fn new() -> Self {
198 Self {
199 single_thread_tracker: None,
200 multi_thread_tracker: None,
201 async_tracker: None,
202 hybrid_tracker: None,
203 }
204 }
205
206 fn get_or_create_tracker(
208 &mut self,
209 tracker_type: TrackerType,
210 ) -> Result<&mut Box<dyn MemoryTracker>, BackendError> {
211 let tracker = match tracker_type {
212 TrackerType::SingleThread => &mut self.single_thread_tracker,
213 TrackerType::MultiThread => &mut self.multi_thread_tracker,
214 TrackerType::AsyncTracker => &mut self.async_tracker,
215 TrackerType::HybridTracker => &mut self.hybrid_tracker,
216 };
217
218 if tracker.is_none() {
219 *tracker = Some(Self::create_tracker(&tracker_type)?);
220 }
221
222 tracker
223 .as_mut()
224 .ok_or_else(|| BackendError::TrackingInitializationFailed {
225 reason: format!("Failed to create {:?} tracker", tracker_type),
226 })
227 }
228
229 fn create_tracker(tracker_type: &TrackerType) -> Result<Box<dyn MemoryTracker>, BackendError> {
231 match tracker_type {
232 TrackerType::SingleThread => Ok(Box::new(SingleThreadTracker::new())),
233 TrackerType::MultiThread => Ok(Box::new(MultiThreadTracker::new())),
234 TrackerType::AsyncTracker => Ok(Box::new(AsyncTrackerWrapper::new())),
235 TrackerType::HybridTracker => Ok(Box::new(HybridTrackerImpl::new())),
236 }
237 }
238
239 fn count_active_trackers(&self) -> usize {
241 let mut count = 0;
242 if let Some(ref tracker) = self.single_thread_tracker {
243 if tracker.is_active() {
244 count += 1;
245 }
246 }
247 if let Some(ref tracker) = self.multi_thread_tracker {
248 if tracker.is_active() {
249 count += 1;
250 }
251 }
252 if let Some(ref tracker) = self.async_tracker {
253 if tracker.is_active() {
254 count += 1;
255 }
256 }
257 if let Some(ref tracker) = self.hybrid_tracker {
258 if tracker.is_active() {
259 count += 1;
260 }
261 }
262 count
263 }
264}
265
266impl TrackingDispatcher {
267 pub fn new(config: DispatcherConfig) -> Self {
269 info!("Creating tracking dispatcher with config: {:?}", config);
270
271 Self {
272 active_strategy: None,
273 tracker_registry: TrackerRegistry::new(),
274 aggregator: Arc::new(LockfreeAggregator::new(
275 std::env::temp_dir().join("memscope_dispatcher"),
276 )),
277 config,
278 metrics: DispatcherMetrics::default(),
279 }
280 }
281
282 pub fn select_strategy(
284 &mut self,
285 environment: &RuntimeEnvironment,
286 ) -> Result<TrackingStrategy, BackendError> {
287 debug!("Selecting strategy for environment: {:?}", environment);
288
289 let strategy = match environment {
290 RuntimeEnvironment::SingleThreaded => TrackingStrategy::GlobalDirect,
291 RuntimeEnvironment::MultiThreaded { thread_count } => {
292 if *thread_count <= 2 {
293 TrackingStrategy::GlobalDirect
294 } else {
295 TrackingStrategy::ThreadLocal
296 }
297 }
298 RuntimeEnvironment::AsyncRuntime { .. } => TrackingStrategy::TaskLocal,
299 RuntimeEnvironment::Hybrid {
300 thread_count,
301 async_task_count,
302 } => {
303 if *thread_count > 1 && *async_task_count > 0 {
304 TrackingStrategy::HybridTracking
305 } else if *async_task_count > 0 {
306 TrackingStrategy::TaskLocal
307 } else {
308 TrackingStrategy::ThreadLocal
309 }
310 }
311 };
312
313 info!(
314 "Selected strategy: {:?} for environment: {:?}",
315 strategy, environment
316 );
317 self.active_strategy = Some(strategy.clone());
318
319 Ok(strategy)
320 }
321
322 pub fn activate_tracking(&mut self, strategy: TrackingStrategy) -> Result<(), BackendError> {
324 info!("Activating tracking with strategy: {:?}", strategy);
325
326 let tracker_type = self.strategy_to_tracker_type(&strategy);
327 let tracker = self.tracker_registry.get_or_create_tracker(tracker_type)?;
328
329 let tracker_config = TrackerConfig::default();
331 tracker.initialize(tracker_config).map_err(|e| {
332 BackendError::TrackingInitializationFailed {
333 reason: format!("Tracker initialization failed: {}", e),
334 }
335 })?;
336
337 tracker
339 .start_tracking()
340 .map_err(|e| BackendError::TrackingInitializationFailed {
341 reason: format!("Failed to start tracking: {}", e),
342 })?;
343
344 self.active_strategy = Some(strategy);
345 self.metrics.active_trackers = self.tracker_registry.count_active_trackers();
346 self.metrics.strategy_switches += 1;
347
348 info!("Tracking activated successfully");
349 Ok(())
350 }
351
352 pub fn dispatch_tracking_operation(
354 &mut self,
355 operation: TrackingOperation,
356 ) -> Result<(), BackendError> {
357 let start_time = std::time::Instant::now();
358
359 let strategy = self
360 .active_strategy
361 .as_ref()
362 .ok_or_else(|| BackendError::TrackingInitializationFailed {
363 reason: "No active tracking strategy".to_string(),
364 })?
365 .clone();
366
367 let tracker_type = self.strategy_to_tracker_type(&strategy);
368
369 {
371 let tracker = self.tracker_registry.get_or_create_tracker(tracker_type)?;
372 Self::execute_operation_static(tracker, operation)?;
374 }
375
376 let dispatch_time = start_time.elapsed().as_micros() as f64;
378 self.update_dispatch_metrics(dispatch_time);
379
380 Ok(())
381 }
382
383 fn execute_operation_static(
385 tracker: &mut Box<dyn MemoryTracker>,
386 operation: TrackingOperation,
387 ) -> Result<(), BackendError> {
388 match operation {
389 TrackingOperation::StartTracking => {
390 if !tracker.is_active() {
391 tracker.start_tracking().map_err(|e| {
392 BackendError::TrackingInitializationFailed {
393 reason: format!("Failed to start tracking: {}", e),
394 }
395 })?;
396 }
397 }
398 TrackingOperation::StopTracking => {
399 if tracker.is_active() {
400 let _data =
401 tracker
402 .stop_tracking()
403 .map_err(|e| BackendError::DataCollectionError {
404 reason: format!("Failed to stop tracking: {}", e),
405 })?;
406 }
408 }
409 TrackingOperation::CollectData => {
410 let stats = tracker.get_statistics();
411 debug!("Collected statistics: {:?}", stats);
412 }
413 }
414
415 Ok(())
416 }
417
418 fn strategy_to_tracker_type(&self, strategy: &TrackingStrategy) -> TrackerType {
420 match strategy {
421 TrackingStrategy::GlobalDirect => TrackerType::SingleThread,
422 TrackingStrategy::ThreadLocal => TrackerType::MultiThread,
423 TrackingStrategy::TaskLocal => TrackerType::AsyncTracker,
424 TrackingStrategy::HybridTracking => TrackerType::HybridTracker,
425 }
426 }
427
428 fn update_dispatch_metrics(&mut self, dispatch_time_us: f64) {
430 self.metrics.total_dispatches += 1;
431
432 let weight = 0.1; self.metrics.avg_dispatch_latency_us =
435 (1.0 - weight) * self.metrics.avg_dispatch_latency_us + weight * dispatch_time_us;
436 }
437
438 pub fn collect_all_data(&mut self) -> Result<Vec<u8>, BackendError> {
440 debug!("Collecting data from all active trackers");
441
442 let mut all_data = Vec::new();
443
444 if let Some(ref mut tracker) = self.tracker_registry.single_thread_tracker {
446 if tracker.is_active() {
447 let data =
448 tracker
449 .stop_tracking()
450 .map_err(|e| BackendError::DataCollectionError {
451 reason: format!("Single thread tracker data collection failed: {}", e),
452 })?;
453 all_data.extend(data);
454 }
455 }
456
457 if let Some(ref mut tracker) = self.tracker_registry.multi_thread_tracker {
458 if tracker.is_active() {
459 let data =
460 tracker
461 .stop_tracking()
462 .map_err(|e| BackendError::DataCollectionError {
463 reason: format!("Multi thread tracker data collection failed: {}", e),
464 })?;
465 all_data.extend(data);
466 }
467 }
468
469 if let Some(ref mut tracker) = self.tracker_registry.async_tracker {
470 if tracker.is_active() {
471 let data =
472 tracker
473 .stop_tracking()
474 .map_err(|e| BackendError::DataCollectionError {
475 reason: format!("Async tracker data collection failed: {}", e),
476 })?;
477 all_data.extend(data);
478 }
479 }
480
481 if let Some(ref mut tracker) = self.tracker_registry.hybrid_tracker {
482 if tracker.is_active() {
483 let data =
484 tracker
485 .stop_tracking()
486 .map_err(|e| BackendError::DataCollectionError {
487 reason: format!("Hybrid tracker data collection failed: {}", e),
488 })?;
489 all_data.extend(data);
490 }
491 }
492
493 info!("Collected {} bytes of tracking data", all_data.len());
494 Ok(all_data)
495 }
496
497 pub fn get_metrics(&self) -> &DispatcherMetrics {
499 &self.metrics
500 }
501
502 pub fn shutdown(mut self) -> Result<Vec<u8>, BackendError> {
504 info!("Shutting down tracking dispatcher");
505
506 let final_data = self.collect_all_data()?;
507
508 debug!("Dispatcher shutdown completed");
509 Ok(final_data)
510 }
511}
512
513#[derive(Debug, Clone)]
515pub enum TrackingOperation {
516 StartTracking,
518 StopTracking,
520 CollectData,
522}
523
524struct SingleThreadTracker {
529 active: bool,
530 allocations: u64,
531}
532
533impl SingleThreadTracker {
534 fn new() -> Self {
535 Self {
536 active: false,
537 allocations: 0,
538 }
539 }
540}
541
542impl MemoryTracker for SingleThreadTracker {
543 fn initialize(&mut self, _config: TrackerConfig) -> Result<(), TrackerError> {
544 debug!("Initializing single thread tracker");
545 Ok(())
546 }
547
548 fn start_tracking(&mut self) -> Result<(), TrackerError> {
549 debug!("Starting single thread tracking");
550 self.active = true;
551 Ok(())
552 }
553
554 fn stop_tracking(&mut self) -> Result<Vec<u8>, TrackerError> {
555 debug!("Stopping single thread tracking");
556 self.active = false;
557 Ok(vec![]) }
559
560 fn get_statistics(&self) -> TrackerStatistics {
561 TrackerStatistics {
562 allocations_tracked: self.allocations,
563 memory_tracked_bytes: 0,
564 overhead_bytes: 0,
565 tracking_duration_ms: 0,
566 }
567 }
568
569 fn is_active(&self) -> bool {
570 self.active
571 }
572
573 fn tracker_type(&self) -> TrackerType {
574 TrackerType::SingleThread
575 }
576}
577
578struct MultiThreadTracker {
580 active: bool,
581}
582
583impl MultiThreadTracker {
584 fn new() -> Self {
585 Self { active: false }
586 }
587}
588
589impl MemoryTracker for MultiThreadTracker {
590 fn initialize(&mut self, _config: TrackerConfig) -> Result<(), TrackerError> {
591 debug!("Initializing multi thread tracker");
592 Ok(())
593 }
594
595 fn start_tracking(&mut self) -> Result<(), TrackerError> {
596 debug!("Starting multi thread tracking");
597 self.active = true;
598 Ok(())
599 }
600
601 fn stop_tracking(&mut self) -> Result<Vec<u8>, TrackerError> {
602 debug!("Stopping multi thread tracking");
603 self.active = false;
604 Ok(vec![])
605 }
606
607 fn get_statistics(&self) -> TrackerStatistics {
608 TrackerStatistics {
609 allocations_tracked: 0,
610 memory_tracked_bytes: 0,
611 overhead_bytes: 0,
612 tracking_duration_ms: 0,
613 }
614 }
615
616 fn is_active(&self) -> bool {
617 self.active
618 }
619
620 fn tracker_type(&self) -> TrackerType {
621 TrackerType::MultiThread
622 }
623}
624
625struct AsyncTrackerWrapper {
627 active: bool,
628}
629
630impl AsyncTrackerWrapper {
631 fn new() -> Self {
632 Self { active: false }
633 }
634}
635
636impl MemoryTracker for AsyncTrackerWrapper {
637 fn initialize(&mut self, _config: TrackerConfig) -> Result<(), TrackerError> {
638 debug!("Initializing async tracker wrapper");
639 Ok(())
640 }
641
642 fn start_tracking(&mut self) -> Result<(), TrackerError> {
643 debug!("Starting async tracking");
644 self.active = true;
645 Ok(())
646 }
647
648 fn stop_tracking(&mut self) -> Result<Vec<u8>, TrackerError> {
649 debug!("Stopping async tracking");
650 self.active = false;
651 Ok(vec![])
652 }
653
654 fn get_statistics(&self) -> TrackerStatistics {
655 TrackerStatistics {
656 allocations_tracked: 0,
657 memory_tracked_bytes: 0,
658 overhead_bytes: 0,
659 tracking_duration_ms: 0,
660 }
661 }
662
663 fn is_active(&self) -> bool {
664 self.active
665 }
666
667 fn tracker_type(&self) -> TrackerType {
668 TrackerType::AsyncTracker
669 }
670}
671
672struct HybridTrackerImpl {
674 active: bool,
675}
676
677impl HybridTrackerImpl {
678 fn new() -> Self {
679 Self { active: false }
680 }
681}
682
683impl MemoryTracker for HybridTrackerImpl {
684 fn initialize(&mut self, _config: TrackerConfig) -> Result<(), TrackerError> {
685 debug!("Initializing hybrid tracker");
686 Ok(())
687 }
688
689 fn start_tracking(&mut self) -> Result<(), TrackerError> {
690 debug!("Starting hybrid tracking");
691 self.active = true;
692 Ok(())
693 }
694
695 fn stop_tracking(&mut self) -> Result<Vec<u8>, TrackerError> {
696 debug!("Stopping hybrid tracking");
697 self.active = false;
698 Ok(vec![])
699 }
700
701 fn get_statistics(&self) -> TrackerStatistics {
702 TrackerStatistics {
703 allocations_tracked: 0,
704 memory_tracked_bytes: 0,
705 overhead_bytes: 0,
706 tracking_duration_ms: 0,
707 }
708 }
709
710 fn is_active(&self) -> bool {
711 self.active
712 }
713
714 fn tracker_type(&self) -> TrackerType {
715 TrackerType::HybridTracker
716 }
717}
718
719#[cfg(test)]
720mod tests {
721 use super::*;
722
723 #[test]
724 fn test_dispatcher_creation() {
725 let config = DispatcherConfig::default();
726 let dispatcher = TrackingDispatcher::new(config);
727 assert!(dispatcher.active_strategy.is_none());
728 }
729
730 #[test]
731 fn test_strategy_selection() {
732 let config = DispatcherConfig::default();
733 let mut dispatcher = TrackingDispatcher::new(config);
734
735 let env = RuntimeEnvironment::SingleThreaded;
736 let strategy = dispatcher.select_strategy(&env);
737
738 assert!(strategy.is_ok());
739 assert_eq!(strategy.unwrap(), TrackingStrategy::GlobalDirect);
740 }
741
742 #[test]
743 fn test_tracker_registry() {
744 let mut registry = TrackerRegistry::new();
745 let tracker = registry.get_or_create_tracker(TrackerType::SingleThread);
746 assert!(tracker.is_ok());
747 }
748
749 #[test]
750 fn test_single_thread_tracker() {
751 let mut tracker = SingleThreadTracker::new();
752 assert!(!tracker.is_active());
753
754 let result = tracker.start_tracking();
755 assert!(result.is_ok());
756 assert!(tracker.is_active());
757
758 let data = tracker.stop_tracking();
759 assert!(data.is_ok());
760 assert!(!tracker.is_active());
761 }
762}