memscope_rs/unified/
backend.rs

1// Unified Backend System for Memory Tracking
2// Provides intelligent routing between single-thread, multi-thread, and async tracking strategies
3// Maintains zero-lock architecture and preserves existing JSON export compatibility
4
5use crate::lockfree::aggregator::LockfreeAggregator;
6use std::sync::Arc;
7use thiserror::Error;
8use tracing::{debug, info, warn};
9
10/// Main unified backend that orchestrates all memory tracking strategies
11/// Acts as the central hub for routing tracking requests to appropriate handlers
12pub struct UnifiedBackend {
13    /// Current runtime environment detection result
14    environment: RuntimeEnvironment,
15    /// Active tracking strategy selected based on environment
16    active_strategy: TrackingStrategy,
17    /// Configuration for backend behavior
18    config: BackendConfig,
19    /// Aggregator for collecting data from all tracking sources
20    aggregator: Arc<LockfreeAggregator>,
21}
22
23/// Detected runtime environment characteristics
24/// Used to determine optimal tracking strategy
25#[derive(Debug, Clone, PartialEq)]
26pub enum RuntimeEnvironment {
27    /// Single-threaded execution detected
28    SingleThreaded,
29    /// Multi-threaded execution with thread count
30    MultiThreaded { thread_count: usize },
31    /// Async runtime detected with runtime type
32    AsyncRuntime { runtime_type: AsyncRuntimeType },
33    /// Hybrid mode with both threads and async tasks
34    Hybrid {
35        thread_count: usize,
36        async_task_count: usize,
37    },
38}
39
40/// Supported async runtime types for specialized tracking
41#[derive(Debug, Clone, PartialEq)]
42pub enum AsyncRuntimeType {
43    /// Tokio runtime
44    Tokio,
45    /// async-std runtime  
46    AsyncStd,
47    /// Custom or unknown runtime
48    Custom,
49}
50
51/// Selected tracking strategy based on environment analysis
52#[derive(Debug, Clone, PartialEq)]
53pub enum TrackingStrategy {
54    /// Direct global tracking for single-threaded apps
55    GlobalDirect,
56    /// Thread-local storage for multi-threaded apps
57    ThreadLocal,
58    /// Task-local storage for async applications
59    TaskLocal,
60    /// Combined strategy for hybrid applications
61    HybridTracking,
62}
63
64/// Configuration options for backend behavior
65#[derive(Debug, Clone)]
66pub struct BackendConfig {
67    /// Enable automatic environment detection
68    pub auto_detect: bool,
69    /// Forced strategy override (bypasses auto-detection)
70    pub force_strategy: Option<TrackingStrategy>,
71    /// Performance sampling rate (0.0 to 1.0)
72    pub sample_rate: f64,
73    /// Maximum memory overhead percentage
74    pub max_overhead_percent: f64,
75}
76
77/// Tracking session handle for controlling active tracking
78pub struct TrackingSession {
79    /// Session identifier for debugging
80    session_id: String,
81    /// Backend reference for data collection
82    backend: Arc<UnifiedBackend>,
83    /// Session start timestamp
84    start_time: std::time::Instant,
85}
86
87/// Comprehensive memory analysis data from tracking session
88#[derive(Debug)]
89pub struct MemoryAnalysisData {
90    /// Raw tracking data from all sources
91    pub raw_data: Vec<u8>,
92    /// Aggregated statistics
93    pub statistics: MemoryStatistics,
94    /// Environment context for analysis
95    pub environment: RuntimeEnvironment,
96    /// Session metadata
97    pub session_metadata: SessionMetadata,
98}
99
100/// Statistical summary of memory tracking session
101#[derive(Debug)]
102pub struct MemoryStatistics {
103    /// Total allocations tracked
104    pub total_allocations: usize,
105    /// Peak memory usage
106    pub peak_memory_bytes: usize,
107    /// Average allocation size
108    pub avg_allocation_size: f64,
109    /// Tracking session duration
110    pub session_duration_ms: u64,
111}
112
113/// Session metadata for analysis context
114#[derive(Debug)]
115pub struct SessionMetadata {
116    /// Session unique identifier
117    pub session_id: String,
118    /// Environment detection results
119    pub detected_environment: RuntimeEnvironment,
120    /// Strategy used for tracking
121    pub strategy_used: TrackingStrategy,
122    /// Performance overhead measured
123    pub overhead_percent: f64,
124}
125
126/// Backend operation errors
127#[derive(Error, Debug)]
128pub enum BackendError {
129    /// Environment detection failed
130    #[error("Failed to detect runtime environment: {reason}")]
131    EnvironmentDetectionFailed { reason: String },
132
133    /// Strategy selection failed
134    #[error("Cannot select appropriate tracking strategy for environment: {environment:?}")]
135    StrategySelectionFailed { environment: RuntimeEnvironment },
136
137    /// Tracking initialization failed
138    #[error("Failed to initialize tracking session: {reason}")]
139    TrackingInitializationFailed { reason: String },
140
141    /// Data collection error
142    #[error("Error collecting tracking data: {reason}")]
143    DataCollectionError { reason: String },
144
145    /// Configuration validation error
146    #[error("Invalid backend configuration: {reason}")]
147    ConfigurationError { reason: String },
148}
149
150impl Default for BackendConfig {
151    /// Default configuration optimized for most use cases
152    fn default() -> Self {
153        Self {
154            auto_detect: true,
155            force_strategy: None,
156            sample_rate: 1.0,
157            max_overhead_percent: 5.0,
158        }
159    }
160}
161
162impl UnifiedBackend {
163    /// Initialize unified backend with configuration
164    /// Performs environment detection and strategy selection
165    pub fn initialize(config: BackendConfig) -> Result<Self, BackendError> {
166        // Validate configuration parameters
167        if config.sample_rate < 0.0 || config.sample_rate > 1.0 {
168            return Err(BackendError::ConfigurationError {
169                reason: "Sample rate must be between 0.0 and 1.0".to_string(),
170            });
171        }
172
173        if config.max_overhead_percent < 0.0 || config.max_overhead_percent > 100.0 {
174            return Err(BackendError::ConfigurationError {
175                reason: "Max overhead percent must be between 0.0 and 100.0".to_string(),
176            });
177        }
178
179        info!("Initializing unified backend with config: {:?}", config);
180
181        // Detect runtime environment
182        let environment = if config.auto_detect {
183            Self::detect_environment()?
184        } else {
185            RuntimeEnvironment::SingleThreaded // Default fallback
186        };
187
188        debug!("Detected environment: {:?}", environment);
189
190        // Select optimal tracking strategy
191        let active_strategy = if let Some(forced) = config.force_strategy.clone() {
192            warn!("Using forced strategy: {:?}", forced);
193            forced
194        } else {
195            Self::select_strategy(&environment)?
196        };
197
198        info!("Selected tracking strategy: {:?}", active_strategy);
199
200        // Initialize aggregator for data collection
201        let output_dir = std::env::temp_dir().join("memscope_unified");
202        let aggregator = Arc::new(LockfreeAggregator::new(output_dir));
203
204        Ok(Self {
205            environment,
206            active_strategy,
207            config,
208            aggregator,
209        })
210    }
211
212    /// Detect current runtime environment characteristics
213    /// Analyzes thread count, async runtime presence, and execution patterns
214    pub fn detect_environment() -> Result<RuntimeEnvironment, BackendError> {
215        debug!("Starting environment detection");
216
217        // Check for async runtime presence
218        let async_runtime = Self::detect_async_runtime();
219
220        // Count available threads
221        let thread_count = std::thread::available_parallelism()
222            .map(|p| p.get())
223            .unwrap_or(1);
224
225        // Determine environment type based on detection results
226        let environment = match (async_runtime, thread_count) {
227            (Some(runtime_type), 0) => {
228                // Edge case: async runtime detected but no threads available
229                RuntimeEnvironment::AsyncRuntime { runtime_type }
230            }
231            (Some(runtime_type), 1) => RuntimeEnvironment::AsyncRuntime { runtime_type },
232            (Some(_runtime_type), threads) => {
233                RuntimeEnvironment::Hybrid {
234                    thread_count: threads,
235                    async_task_count: 0, // Will be detected during runtime
236                }
237            }
238            (None, 1) => RuntimeEnvironment::SingleThreaded,
239            (None, threads) => RuntimeEnvironment::MultiThreaded {
240                thread_count: threads,
241            },
242        };
243
244        debug!("Environment detection completed: {:?}", environment);
245        Ok(environment)
246    }
247
248    /// Detect presence and type of async runtime
249    /// Returns None if no async runtime is detected
250    fn detect_async_runtime() -> Option<AsyncRuntimeType> {
251        // Check for Tokio runtime
252        if Self::is_tokio_present() {
253            debug!("Tokio runtime detected");
254            return Some(AsyncRuntimeType::Tokio);
255        }
256
257        // Check for async-std runtime
258        if Self::is_async_std_present() {
259            debug!("async-std runtime detected");
260            return Some(AsyncRuntimeType::AsyncStd);
261        }
262
263        // No known async runtime detected
264        debug!("No async runtime detected");
265        None
266    }
267
268    /// Check if Tokio runtime is currently active
269    fn is_tokio_present() -> bool {
270        // Use feature detection or runtime introspection
271        // This is a simplified implementation - real detection would be more sophisticated
272        std::env::var("TOKIO_WORKER_THREADS").is_ok()
273        // Note: tokio::runtime::Handle::try_current() requires tokio dependency
274    }
275
276    /// Check if async-std runtime is active
277    fn is_async_std_present() -> bool {
278        // async-std detection logic would go here
279        // This is a placeholder for actual implementation
280        false
281    }
282
283    /// Select optimal tracking strategy based on environment
284    fn select_strategy(environment: &RuntimeEnvironment) -> Result<TrackingStrategy, BackendError> {
285        let strategy = match environment {
286            RuntimeEnvironment::SingleThreaded => TrackingStrategy::GlobalDirect,
287            RuntimeEnvironment::MultiThreaded { .. } => TrackingStrategy::ThreadLocal,
288            RuntimeEnvironment::AsyncRuntime { .. } => TrackingStrategy::TaskLocal,
289            RuntimeEnvironment::Hybrid { .. } => TrackingStrategy::HybridTracking,
290        };
291
292        debug!(
293            "Selected strategy {:?} for environment {:?}",
294            strategy, environment
295        );
296        Ok(strategy)
297    }
298
299    /// Start active memory tracking session
300    /// Returns session handle for controlling tracking lifecycle
301    pub fn start_tracking(&mut self) -> Result<TrackingSession, BackendError> {
302        let session_id = format!(
303            "session_{}",
304            std::time::SystemTime::now()
305                .duration_since(std::time::UNIX_EPOCH)
306                .map_err(|e| BackendError::TrackingInitializationFailed {
307                    reason: format!("Failed to generate session ID: {}", e),
308                })?
309                .as_millis()
310        );
311
312        info!("Starting tracking session: {}", session_id);
313
314        // Initialize tracking based on selected strategy
315        match self.active_strategy {
316            TrackingStrategy::GlobalDirect => {
317                self.initialize_global_tracking()?;
318            }
319            TrackingStrategy::ThreadLocal => {
320                self.initialize_thread_local_tracking()?;
321            }
322            TrackingStrategy::TaskLocal => {
323                self.initialize_task_local_tracking()?;
324            }
325            TrackingStrategy::HybridTracking => {
326                self.initialize_hybrid_tracking()?;
327            }
328        }
329
330        let session = TrackingSession {
331            session_id: session_id.clone(),
332            backend: Arc::new(self.clone()),
333            start_time: std::time::Instant::now(),
334        };
335
336        debug!("Tracking session {} started successfully", session_id);
337        Ok(session)
338    }
339
340    /// Initialize global direct tracking for single-threaded applications
341    fn initialize_global_tracking(&mut self) -> Result<(), BackendError> {
342        debug!("Initializing global direct tracking");
343        // Implementation will integrate with existing lockfree aggregator
344        Ok(())
345    }
346
347    /// Initialize thread-local tracking for multi-threaded applications
348    fn initialize_thread_local_tracking(&mut self) -> Result<(), BackendError> {
349        debug!("Initializing thread-local tracking");
350        // Implementation will use thread_local! storage
351        Ok(())
352    }
353
354    /// Initialize task-local tracking for async applications
355    fn initialize_task_local_tracking(&mut self) -> Result<(), BackendError> {
356        debug!("Initializing task-local tracking");
357        // Implementation will integrate with AsyncMemoryTracker
358        Ok(())
359    }
360
361    /// Initialize hybrid tracking for complex applications
362    fn initialize_hybrid_tracking(&mut self) -> Result<(), BackendError> {
363        debug!("Initializing hybrid tracking");
364        // Implementation will combine multiple strategies
365        Ok(())
366    }
367
368    /// Collect all tracking data from active session
369    /// Aggregates data from all tracking sources into unified format
370    pub fn collect_data(&self) -> Result<MemoryAnalysisData, BackendError> {
371        debug!("Collecting tracking data");
372
373        // Collect raw data from aggregator (placeholder for now)
374        // TODO: Implement proper aggregator data collection
375        let raw_data = vec![];
376
377        // Calculate statistics
378        let statistics = self.calculate_statistics(&raw_data)?;
379
380        // Create session metadata
381        let session_metadata = SessionMetadata {
382            session_id: "current_session".to_string(), // Will be proper session ID
383            detected_environment: self.environment.clone(),
384            strategy_used: self.active_strategy.clone(),
385            overhead_percent: self.measure_overhead(),
386        };
387
388        let analysis_data = MemoryAnalysisData {
389            raw_data,
390            statistics,
391            environment: self.environment.clone(),
392            session_metadata,
393        };
394
395        info!(
396            "Data collection completed, {} allocations tracked",
397            analysis_data.statistics.total_allocations
398        );
399
400        Ok(analysis_data)
401    }
402
403    /// Calculate statistical summary from raw tracking data
404    fn calculate_statistics(&self, _raw_data: &[u8]) -> Result<MemoryStatistics, BackendError> {
405        // This would parse the raw data and calculate actual statistics
406        // For now, return placeholder statistics
407        Ok(MemoryStatistics {
408            total_allocations: 0,
409            peak_memory_bytes: 0,
410            avg_allocation_size: 0.0,
411            session_duration_ms: 0,
412        })
413    }
414
415    /// Measure current tracking overhead percentage
416    fn measure_overhead(&self) -> f64 {
417        // Implementation would measure actual performance impact
418        // Return configured max overhead as placeholder
419        self.config.max_overhead_percent
420    }
421
422    /// Shutdown backend and finalize all tracking
423    pub fn shutdown(self) -> Result<MemoryAnalysisData, BackendError> {
424        info!("Shutting down unified backend");
425
426        // Collect final data before shutdown
427        let final_data = self.collect_data()?;
428
429        debug!("Backend shutdown completed successfully");
430        Ok(final_data)
431    }
432}
433
434// Required for Arc usage in TrackingSession
435impl Clone for UnifiedBackend {
436    fn clone(&self) -> Self {
437        Self {
438            environment: self.environment.clone(),
439            active_strategy: self.active_strategy.clone(),
440            config: self.config.clone(),
441            aggregator: Arc::clone(&self.aggregator),
442        }
443    }
444}
445
446impl TrackingSession {
447    /// Get session identifier
448    pub fn session_id(&self) -> &str {
449        &self.session_id
450    }
451
452    /// Get elapsed time since session start
453    pub fn elapsed_time(&self) -> std::time::Duration {
454        self.start_time.elapsed()
455    }
456
457    /// Collect current tracking data without ending session
458    pub fn collect_data(&self) -> Result<MemoryAnalysisData, BackendError> {
459        self.backend.collect_data()
460    }
461
462    /// End tracking session and collect final data
463    pub fn end_session(self) -> Result<MemoryAnalysisData, BackendError> {
464        info!("Ending tracking session: {}", self.session_id);
465
466        let final_data = self.backend.collect_data()?;
467
468        debug!(
469            "Session {} ended after {:?}",
470            self.session_id,
471            self.start_time.elapsed()
472        );
473
474        Ok(final_data)
475    }
476}
477
478#[cfg(test)]
479mod tests {
480    use super::*;
481
482    #[test]
483    fn test_backend_initialization() {
484        let config = BackendConfig::default();
485        let backend = UnifiedBackend::initialize(config);
486        assert!(backend.is_ok());
487    }
488
489    #[test]
490    fn test_environment_detection() {
491        let env = UnifiedBackend::detect_environment();
492        assert!(env.is_ok());
493    }
494
495    #[test]
496    fn test_invalid_config_sample_rate() {
497        let config = BackendConfig {
498            sample_rate: 1.5, // Invalid: > 1.0
499            ..Default::default()
500        };
501        let result = UnifiedBackend::initialize(config);
502        assert!(matches!(
503            result,
504            Err(BackendError::ConfigurationError { .. })
505        ));
506    }
507
508    #[test]
509    fn test_strategy_selection() {
510        let env = RuntimeEnvironment::SingleThreaded;
511        let strategy = UnifiedBackend::select_strategy(&env);
512        assert!(matches!(strategy, Ok(TrackingStrategy::GlobalDirect)));
513    }
514}