Skip to main content

voirs_conversion/
thread_safety.rs

1//! Thread safety improvements for voice conversion system
2//!
3//! This module provides enhanced thread safety patterns, concurrent operation management,
4//! and safe resource sharing for the voice conversion system.
5
6use crate::{
7    config::ConversionConfig,
8    models::ConversionModel,
9    types::{ConversionRequest, ConversionResult, ConversionType},
10    Error, Result,
11};
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
15use std::sync::Arc;
16use std::sync::Weak;
17use std::time::{Duration, Instant};
18use tokio::sync::{Mutex, OwnedSemaphorePermit, RwLock, Semaphore};
19use tracing::{debug, info, trace, warn};
20
21/// Memory safety auditing system for voice conversion
22pub struct MemorySafetyAuditor {
23    /// Track memory allocations
24    allocation_tracker: Arc<RwLock<AllocationTracker>>,
25    /// Track reference cycles
26    reference_tracker: Arc<RwLock<ReferenceTracker>>,
27    /// Monitor buffer safety
28    buffer_safety_monitor: Arc<RwLock<BufferSafetyMonitor>>,
29    /// Audit configuration
30    audit_config: MemorySafetyConfig,
31}
32
33/// Configuration for memory safety auditing
34#[derive(Debug, Clone)]
35pub struct MemorySafetyConfig {
36    /// Enable allocation tracking
37    pub enable_allocation_tracking: bool,
38    /// Enable reference cycle detection
39    pub enable_reference_cycle_detection: bool,
40    /// Enable buffer bounds checking
41    pub enable_buffer_bounds_checking: bool,
42    /// Maximum memory usage threshold (in bytes)
43    pub max_memory_threshold: u64,
44    /// Enable automatic cleanup
45    pub enable_automatic_cleanup: bool,
46    /// Audit interval for periodic checks
47    pub audit_interval: Duration,
48}
49
50impl Default for MemorySafetyConfig {
51    fn default() -> Self {
52        Self {
53            enable_allocation_tracking: true,
54            enable_reference_cycle_detection: true,
55            enable_buffer_bounds_checking: true,
56            max_memory_threshold: 1024 * 1024 * 1024, // 1GB
57            enable_automatic_cleanup: true,
58            audit_interval: Duration::from_secs(30),
59        }
60    }
61}
62
63/// Track memory allocations and usage patterns
64#[derive(Debug, Default)]
65pub struct AllocationTracker {
66    /// Total allocations
67    pub total_allocations: AtomicU64,
68    /// Total deallocations
69    pub total_deallocations: AtomicU64,
70    /// Current memory usage
71    pub current_memory_usage: AtomicU64,
72    /// Peak memory usage
73    pub peak_memory_usage: AtomicU64,
74    /// Active allocations by ID
75    pub active_allocations: HashMap<String, AllocationInfo>,
76    /// Allocation patterns
77    pub allocation_patterns: HashMap<String, AllocationPattern>,
78    /// Memory leaks detected
79    pub detected_leaks: Vec<MemoryLeak>,
80}
81
82/// Information about a specific allocation
83#[derive(Debug, Clone)]
84pub struct AllocationInfo {
85    /// Unique identifier for this allocation
86    pub allocation_id: String,
87    /// Size of the allocation in bytes
88    pub size: u64,
89    /// Timestamp when allocation occurred
90    pub timestamp: Instant,
91    /// Source location where allocation occurred
92    pub location: String,
93    /// Type of allocation
94    pub allocation_type: AllocationType,
95    /// Thread that performed the allocation
96    pub thread_id: std::thread::ThreadId,
97}
98
99/// Type of memory allocation
100#[derive(Debug, Clone, PartialEq)]
101pub enum AllocationType {
102    /// Audio processing buffer for holding audio samples
103    AudioBuffer,
104    /// Model weights and data for neural networks
105    ModelData,
106    /// Conversion result cache for storing processed results
107    ConversionCache,
108    /// Temporary processing buffer for intermediate computations
109    TemporaryBuffer,
110    /// Configuration data for system settings
111    ConfigurationData,
112    /// Performance metrics data for monitoring
113    MetricsData,
114    /// Other type of allocation with custom description
115    Other(String),
116}
117
118/// Pattern of memory allocation behavior
119#[derive(Debug, Clone)]
120pub struct AllocationPattern {
121    /// Name identifying this allocation pattern
122    pub pattern_name: String,
123    /// Number of allocations following this pattern
124    pub allocation_count: u32,
125    /// Average size of allocations in bytes
126    pub average_size: u64,
127    /// Total size of all allocations in bytes
128    pub total_size: u64,
129    /// Frequency of allocations per second
130    pub frequency: f64,
131    /// Typical lifetime duration for allocations in this pattern
132    pub typical_lifetime: Duration,
133}
134
135/// Detected memory leak information
136#[derive(Debug, Clone)]
137pub struct MemoryLeak {
138    /// Unique identifier for this leak
139    pub leak_id: String,
140    /// Information about the leaked allocation
141    pub allocation_info: AllocationInfo,
142    /// Timestamp when the leak was detected
143    pub leak_detected_at: Instant,
144    /// Estimated duration the leak has existed
145    pub estimated_leak_duration: Duration,
146    /// Severity level of the leak
147    pub severity: LeakSeverity,
148}
149
150/// Severity of memory leak
151#[derive(Debug, Clone, PartialEq)]
152pub enum LeakSeverity {
153    /// Low severity - small, short-lived leaks with minimal impact
154    Low,
155    /// Medium severity - moderate size or duration requiring monitoring
156    Medium,
157    /// High severity - large size or long duration requiring attention
158    High,
159    /// Critical severity - severe leaks that could cause system instability
160    Critical,
161}
162
163/// Track reference cycles and ownership patterns
164#[derive(Debug, Default)]
165pub struct ReferenceTracker {
166    /// Active strong references
167    pub strong_references: HashMap<String, ReferenceInfo>,
168    /// Active weak references
169    pub weak_references: HashMap<String, WeakReferenceInfo>,
170    /// Detected cycles
171    pub detected_cycles: Vec<ReferenceCycle>,
172    /// Reference creation patterns
173    pub reference_patterns: HashMap<String, ReferencePattern>,
174}
175
176/// Information about a reference
177#[derive(Debug, Clone)]
178pub struct ReferenceInfo {
179    /// Unique identifier for this reference
180    pub reference_id: String,
181    /// Type of object being referenced
182    pub object_type: String,
183    /// Timestamp when reference was created
184    pub created_at: Instant,
185    /// Timestamp of last access to this reference
186    pub last_accessed: Instant,
187    /// Total number of times this reference has been accessed
188    pub access_count: u32,
189    /// Source code location where reference was created
190    pub source_location: String,
191    /// Chain of references leading to this object for cycle detection
192    pub reference_chain: Vec<String>,
193}
194
195/// Information about a weak reference
196#[derive(Debug, Clone)]
197pub struct WeakReferenceInfo {
198    /// Unique identifier for this weak reference
199    pub reference_id: String,
200    /// Type of object being weakly referenced
201    pub object_type: String,
202    /// Timestamp when weak reference was created
203    pub created_at: Instant,
204    /// Whether the weak reference can still be upgraded
205    pub is_valid: bool,
206    /// Number of times upgrade was attempted
207    pub upgrade_attempts: u32,
208    /// Number of successful upgrades to strong reference
209    pub successful_upgrades: u32,
210}
211
212/// Detected reference cycle
213#[derive(Debug, Clone)]
214pub struct ReferenceCycle {
215    /// Unique identifier for this cycle
216    pub cycle_id: String,
217    /// List of object IDs participating in the cycle
218    pub objects_in_cycle: Vec<String>,
219    /// Number of objects in the cycle
220    pub cycle_length: usize,
221    /// Timestamp when cycle was detected
222    pub detected_at: Instant,
223    /// Classification of the cycle type
224    pub cycle_type: CycleType,
225    /// Estimated memory impact in bytes
226    pub estimated_memory_impact: u64,
227}
228
229/// Type of reference cycle
230#[derive(Debug, Clone, PartialEq)]
231pub enum CycleType {
232    /// Direct cycle between two objects (A -> B -> A)
233    DirectCycle,
234    /// Indirect cycle through multiple objects (A -> B -> C -> A)
235    IndirectCycle,
236    /// Complex cycle with multiple interconnected cycles
237    ComplexCycle,
238}
239
240/// Pattern of reference creation and usage
241#[derive(Debug, Clone)]
242pub struct ReferencePattern {
243    /// Name identifying this reference pattern
244    pub pattern_name: String,
245    /// Frequency of reference creation per second
246    pub creation_frequency: f64,
247    /// Average lifetime before reference is dropped
248    pub average_lifetime: Duration,
249    /// Typical access pattern observed for these references
250    pub typical_access_pattern: AccessPattern,
251    /// Common chains of references observed in this pattern
252    pub common_reference_chains: Vec<Vec<String>>,
253}
254
255/// Access pattern for references
256#[derive(Debug, Clone, PartialEq)]
257pub enum AccessPattern {
258    /// Used once and then dropped immediately
259    SingleAccess,
260    /// Heavy usage concentrated in short time periods
261    BurstAccess,
262    /// Regular consistent access throughout lifetime
263    SteadyAccess,
264    /// Access frequency decreases over time
265    DecreasingAccess,
266    /// Regular periodic access with predictable intervals
267    PeriodicAccess,
268}
269
270/// Monitor buffer safety and bounds checking
271#[derive(Debug, Default)]
272pub struct BufferSafetyMonitor {
273    /// Buffer bounds violations
274    pub bounds_violations: Vec<BoundsViolation>,
275    /// Buffer usage statistics
276    pub buffer_stats: HashMap<String, BufferStats>,
277    /// Unsafe buffer operations detected
278    pub unsafe_operations: Vec<UnsafeOperation>,
279    /// Buffer lifecycle tracking
280    pub buffer_lifecycle: HashMap<String, BufferLifecycle>,
281}
282
283/// Buffer bounds violation information
284#[derive(Debug, Clone)]
285pub struct BoundsViolation {
286    /// Unique identifier for this violation
287    pub violation_id: String,
288    /// ID of the buffer where violation occurred
289    pub buffer_id: String,
290    /// Type of bounds violation detected
291    pub violation_type: ViolationType,
292    /// Index that was attempted to access
293    pub attempted_index: isize,
294    /// Actual size of the buffer
295    pub buffer_size: usize,
296    /// Stack trace at time of violation
297    pub stack_trace: String,
298    /// Timestamp when violation was detected
299    pub detected_at: Instant,
300    /// Severity level of the violation
301    pub severity: ViolationSeverity,
302}
303
304/// Type of bounds violation
305#[derive(Debug, Clone, PartialEq)]
306pub enum ViolationType {
307    /// Attempted to read data beyond buffer bounds
308    ReadBeyondBounds,
309    /// Attempted to write data beyond buffer bounds
310    WriteBeyondBounds,
311    /// Attempted to access buffer with negative index
312    NegativeIndex,
313    /// Attempted to use buffer after it was freed
314    UseAfterFree,
315    /// Attempted to free the same buffer twice
316    DoubleFree,
317}
318
319/// Severity of bounds violation
320#[derive(Debug, Clone, PartialEq)]
321pub enum ViolationSeverity {
322    /// Warning - potential issue but handled safely
323    Warning,
324    /// Error - definite violation that was caught and handled
325    Error,
326    /// Critical - violation that could cause undefined behavior or crashes
327    Critical,
328}
329
330/// Statistics for buffer usage
331#[derive(Debug, Clone)]
332pub struct BufferStats {
333    /// Unique identifier for this buffer
334    pub buffer_id: String,
335    /// Type classification of the buffer
336    pub buffer_type: String,
337    /// Current size of the buffer in elements
338    pub size: usize,
339    /// Total number of times buffer was accessed
340    pub access_count: u32,
341    /// Number of read operations performed
342    pub read_operations: u32,
343    /// Number of write operations performed
344    pub write_operations: u32,
345    /// Number of times buffer was resized
346    pub resize_operations: u32,
347    /// Timestamp of first access to buffer
348    pub first_access: Instant,
349    /// Timestamp of most recent access
350    pub last_access: Instant,
351    /// Average time interval between accesses
352    pub average_access_interval: Duration,
353}
354
355/// Unsafe operation detected
356#[derive(Debug, Clone)]
357pub struct UnsafeOperation {
358    /// Unique identifier for this unsafe operation
359    pub operation_id: String,
360    /// Type of unsafe operation detected
361    pub operation_type: UnsafeOperationType,
362    /// ID of buffer involved in unsafe operation
363    pub buffer_id: String,
364    /// Timestamp when operation was detected
365    pub detected_at: Instant,
366    /// Risk level of the unsafe operation
367    pub risk_level: RiskLevel,
368    /// Description of mitigation if automatically applied
369    pub mitigation_applied: Option<String>,
370}
371
372/// Type of unsafe operation
373#[derive(Debug, Clone, PartialEq)]
374pub enum UnsafeOperationType {
375    /// Memory access not aligned to required boundaries
376    UnalignedAccess,
377    /// Data race detected from concurrent unsynchronized access
378    RacyAccess,
379    /// Pointer references deallocated or invalid memory
380    DanglingPointer,
381    /// Write operation exceeded buffer capacity
382    BufferOverflow,
383    /// Attempted to use value after it was moved
384    UseAfterMove,
385    /// Multiple threads mutating shared data without synchronization
386    ConcurrentMutation,
387}
388
389/// Risk level of unsafe operation
390#[derive(Debug, Clone, PartialEq)]
391pub enum RiskLevel {
392    /// Low risk with minimal impact on safety
393    Low,
394    /// Medium risk requiring monitoring
395    Medium,
396    /// High risk that should be addressed promptly
397    High,
398    /// Critical risk requiring immediate attention
399    Critical,
400}
401
402/// Buffer lifecycle tracking
403#[derive(Debug, Clone)]
404pub struct BufferLifecycle {
405    /// Unique identifier for this buffer
406    pub buffer_id: String,
407    /// Timestamp when buffer was created
408    pub created_at: Instant,
409    /// History of size changes with timestamps
410    pub size_changes: Vec<(Instant, usize)>,
411    /// History of access operations with timestamps
412    pub access_pattern: Vec<(Instant, AccessType)>,
413    /// Current state of the buffer
414    pub current_state: BufferState,
415    /// Expected lifetime if known
416    pub expected_lifetime: Option<Duration>,
417}
418
419/// Type of buffer access
420#[derive(Debug, Clone, PartialEq)]
421pub enum AccessType {
422    /// Read operation from buffer
423    Read,
424    /// Write operation to buffer
425    Write,
426    /// Buffer was resized
427    Resize,
428    /// Buffer was cloned
429    Clone,
430    /// Buffer ownership was moved
431    Move,
432}
433
434/// Current state of buffer
435#[derive(Debug, Clone, PartialEq)]
436pub enum BufferState {
437    /// Buffer is active and available for use
438    Active,
439    /// Buffer is currently borrowed
440    Borrowed,
441    /// Buffer ownership has been moved
442    Moved,
443    /// Buffer has been dropped and deallocated
444    Dropped,
445}
446
447impl MemorySafetyAuditor {
448    /// Create new memory safety auditor
449    pub fn new(config: MemorySafetyConfig) -> Self {
450        Self {
451            allocation_tracker: Arc::new(RwLock::new(AllocationTracker::default())),
452            reference_tracker: Arc::new(RwLock::new(ReferenceTracker::default())),
453            buffer_safety_monitor: Arc::new(RwLock::new(BufferSafetyMonitor::default())),
454            audit_config: config,
455        }
456    }
457
458    /// Start periodic memory safety audit
459    pub async fn start_periodic_audit(&self) -> Result<()> {
460        if !self.audit_config.enable_allocation_tracking
461            && !self.audit_config.enable_reference_cycle_detection
462            && !self.audit_config.enable_buffer_bounds_checking
463        {
464            return Ok(()); // No auditing enabled
465        }
466
467        let auditor = Self {
468            allocation_tracker: Arc::clone(&self.allocation_tracker),
469            reference_tracker: Arc::clone(&self.reference_tracker),
470            buffer_safety_monitor: Arc::clone(&self.buffer_safety_monitor),
471            audit_config: self.audit_config.clone(),
472        };
473
474        tokio::spawn(async move {
475            let mut interval = tokio::time::interval(auditor.audit_config.audit_interval);
476
477            loop {
478                interval.tick().await;
479
480                if let Err(e) = auditor.perform_audit().await {
481                    warn!("Memory safety audit failed: {}", e);
482                }
483            }
484        });
485
486        info!(
487            "Started periodic memory safety audit with interval: {:?}",
488            self.audit_config.audit_interval
489        );
490        Ok(())
491    }
492
493    /// Perform comprehensive memory safety audit
494    pub async fn perform_audit(&self) -> Result<MemorySafetyReport> {
495        let mut report = MemorySafetyReport::default();
496
497        // Audit memory allocations
498        if self.audit_config.enable_allocation_tracking {
499            report.allocation_audit = Some(self.audit_allocations().await?);
500        }
501
502        // Audit reference cycles
503        if self.audit_config.enable_reference_cycle_detection {
504            report.reference_audit = Some(self.audit_references().await?);
505        }
506
507        // Audit buffer safety
508        if self.audit_config.enable_buffer_bounds_checking {
509            report.buffer_audit = Some(self.audit_buffers().await?);
510        }
511
512        // Calculate overall safety score
513        report.overall_safety_score = self.calculate_safety_score(&report);
514        report.audit_timestamp = Instant::now();
515
516        // Apply automatic cleanup if enabled
517        if self.audit_config.enable_automatic_cleanup {
518            self.apply_automatic_cleanup(&report).await?;
519        }
520
521        Ok(report)
522    }
523
524    /// Audit memory allocations for leaks and patterns
525    async fn audit_allocations(&self) -> Result<AllocationAuditResult> {
526        let tracker = self.allocation_tracker.read().await;
527        let mut result = AllocationAuditResult::default();
528
529        // Check for memory leaks
530        let current_time = Instant::now();
531        for (id, alloc_info) in &tracker.active_allocations {
532            let age = current_time.duration_since(alloc_info.timestamp);
533
534            // Consider allocations older than 5 minutes as potential leaks
535            if age > Duration::from_secs(300) {
536                let severity = match alloc_info.size {
537                    size if size > 100 * 1024 * 1024 => LeakSeverity::Critical, // > 100MB
538                    size if size > 10 * 1024 * 1024 => LeakSeverity::High,      // > 10MB
539                    size if size > 1024 * 1024 => LeakSeverity::Medium,         // > 1MB
540                    _ => LeakSeverity::Low,
541                };
542
543                let leak = MemoryLeak {
544                    leak_id: format!("leak_{}", id),
545                    allocation_info: alloc_info.clone(),
546                    leak_detected_at: current_time,
547                    estimated_leak_duration: age,
548                    severity,
549                };
550
551                result.detected_leaks.push(leak);
552            }
553        }
554
555        // Calculate statistics
556        result.total_active_allocations = tracker.active_allocations.len();
557        result.current_memory_usage = tracker.current_memory_usage.load(Ordering::Relaxed);
558        result.peak_memory_usage = tracker.peak_memory_usage.load(Ordering::Relaxed);
559        result.allocation_patterns = tracker.allocation_patterns.clone();
560
561        // Check if memory usage exceeds threshold
562        if result.current_memory_usage > self.audit_config.max_memory_threshold {
563            result.memory_threshold_exceeded = true;
564            warn!(
565                "Memory usage ({} bytes) exceeds threshold ({} bytes)",
566                result.current_memory_usage, self.audit_config.max_memory_threshold
567            );
568        }
569
570        Ok(result)
571    }
572
573    /// Audit reference cycles and ownership patterns
574    async fn audit_references(&self) -> Result<ReferenceAuditResult> {
575        let tracker = self.reference_tracker.read().await;
576        let mut result = ReferenceAuditResult::default();
577
578        // Detect potential reference cycles using simple graph traversal
579        result.detected_cycles = tracker.detected_cycles.clone();
580        result.active_strong_references = tracker.strong_references.len();
581        result.active_weak_references = tracker.weak_references.len();
582
583        // Check for orphaned references (strong references with no activity)
584        let current_time = Instant::now();
585        for (id, ref_info) in &tracker.strong_references {
586            let idle_time = current_time.duration_since(ref_info.last_accessed);
587            if idle_time > Duration::from_secs(600) {
588                // 10 minutes idle
589                result.orphaned_references.push(ref_info.clone());
590            }
591        }
592
593        // Analyze reference patterns
594        result.reference_patterns = tracker.reference_patterns.clone();
595
596        Ok(result)
597    }
598
599    /// Audit buffer safety and bounds checking
600    async fn audit_buffers(&self) -> Result<BufferAuditResult> {
601        let monitor = self.buffer_safety_monitor.read().await;
602        let mut result = BufferAuditResult::default();
603
604        result.bounds_violations = monitor.bounds_violations.clone();
605        result.unsafe_operations = monitor.unsafe_operations.clone();
606        result.buffer_statistics = monitor.buffer_stats.clone();
607
608        // Analyze buffer lifecycle patterns
609        for (id, lifecycle) in &monitor.buffer_lifecycle {
610            if lifecycle.current_state == BufferState::Dropped {
611                continue; // Skip dropped buffers
612            }
613
614            // Check for long-lived buffers that might be leaked
615            let age = Instant::now().duration_since(lifecycle.created_at);
616            if age > Duration::from_secs(1800) {
617                // 30 minutes
618                result.long_lived_buffers.push(lifecycle.clone());
619            }
620        }
621
622        Ok(result)
623    }
624
625    /// Calculate overall safety score based on audit results
626    fn calculate_safety_score(&self, report: &MemorySafetyReport) -> f64 {
627        let mut score = 100.0;
628
629        if let Some(ref alloc_audit) = report.allocation_audit {
630            // Deduct points for memory leaks
631            for leak in &alloc_audit.detected_leaks {
632                let deduction = match leak.severity {
633                    LeakSeverity::Critical => 25.0,
634                    LeakSeverity::High => 15.0,
635                    LeakSeverity::Medium => 8.0,
636                    LeakSeverity::Low => 3.0,
637                };
638                score -= deduction;
639            }
640
641            // Deduct points for memory threshold exceeded
642            if alloc_audit.memory_threshold_exceeded {
643                score -= 20.0;
644            }
645        }
646
647        if let Some(ref ref_audit) = report.reference_audit {
648            // Deduct points for reference cycles
649            score -= ref_audit.detected_cycles.len() as f64 * 10.0;
650
651            // Deduct points for orphaned references
652            score -= ref_audit.orphaned_references.len() as f64 * 5.0;
653        }
654
655        if let Some(ref buf_audit) = report.buffer_audit {
656            // Deduct points for bounds violations
657            for violation in &buf_audit.bounds_violations {
658                let deduction = match violation.severity {
659                    ViolationSeverity::Critical => 30.0,
660                    ViolationSeverity::Error => 15.0,
661                    ViolationSeverity::Warning => 5.0,
662                };
663                score -= deduction;
664            }
665
666            // Deduct points for unsafe operations
667            for operation in &buf_audit.unsafe_operations {
668                let deduction = match operation.risk_level {
669                    RiskLevel::Critical => 25.0,
670                    RiskLevel::High => 15.0,
671                    RiskLevel::Medium => 8.0,
672                    RiskLevel::Low => 3.0,
673                };
674                score -= deduction;
675            }
676        }
677
678        // Ensure score doesn't go negative
679        score.max(0.0)
680    }
681
682    /// Apply automatic cleanup based on audit results
683    async fn apply_automatic_cleanup(&self, report: &MemorySafetyReport) -> Result<()> {
684        if let Some(ref alloc_audit) = report.allocation_audit {
685            // Clean up low-severity leaks that are very old
686            let mut tracker = self.allocation_tracker.write().await;
687            let mut cleaned_up = 0;
688
689            let current_time = Instant::now();
690            let old_allocations: Vec<String> = tracker
691                .active_allocations
692                .iter()
693                .filter(|(_, alloc)| {
694                    let age = current_time.duration_since(alloc.timestamp);
695                    age > Duration::from_secs(3600) && alloc.size < 1024 * 1024 // 1 hour old and < 1MB
696                })
697                .map(|(id, _)| id.clone())
698                .collect();
699
700            for id in old_allocations {
701                if let Some(alloc_info) = tracker.active_allocations.remove(&id) {
702                    let current_usage = tracker.current_memory_usage.load(Ordering::Relaxed);
703                    tracker.current_memory_usage.store(
704                        current_usage.saturating_sub(alloc_info.size),
705                        Ordering::Relaxed,
706                    );
707                    tracker.total_deallocations.fetch_add(1, Ordering::Relaxed);
708                    cleaned_up += 1;
709                }
710            }
711
712            if cleaned_up > 0 {
713                info!("Automatic cleanup removed {} old allocations", cleaned_up);
714            }
715        }
716
717        Ok(())
718    }
719
720    /// Track a new memory allocation
721    pub async fn track_allocation(
722        &self,
723        allocation_id: String,
724        size: u64,
725        location: String,
726        allocation_type: AllocationType,
727    ) -> Result<()> {
728        if !self.audit_config.enable_allocation_tracking {
729            return Ok(());
730        }
731
732        let mut tracker = self.allocation_tracker.write().await;
733
734        let alloc_info = AllocationInfo {
735            allocation_id: allocation_id.clone(),
736            size,
737            timestamp: Instant::now(),
738            location,
739            allocation_type,
740            thread_id: std::thread::current().id(),
741        };
742
743        tracker.active_allocations.insert(allocation_id, alloc_info);
744        tracker.total_allocations.fetch_add(1, Ordering::Relaxed);
745
746        let new_usage = tracker
747            .current_memory_usage
748            .fetch_add(size, Ordering::Relaxed)
749            + size;
750
751        // Update peak memory usage
752        let current_peak = tracker.peak_memory_usage.load(Ordering::Relaxed);
753        if new_usage > current_peak {
754            tracker
755                .peak_memory_usage
756                .store(new_usage, Ordering::Relaxed);
757        }
758
759        Ok(())
760    }
761
762    /// Track deallocation of memory
763    pub async fn track_deallocation(&self, allocation_id: &str) -> Result<()> {
764        if !self.audit_config.enable_allocation_tracking {
765            return Ok(());
766        }
767
768        let mut tracker = self.allocation_tracker.write().await;
769
770        if let Some(alloc_info) = tracker.active_allocations.remove(allocation_id) {
771            tracker.total_deallocations.fetch_add(1, Ordering::Relaxed);
772            let current_usage = tracker.current_memory_usage.load(Ordering::Relaxed);
773            tracker.current_memory_usage.store(
774                current_usage.saturating_sub(alloc_info.size),
775                Ordering::Relaxed,
776            );
777        }
778
779        Ok(())
780    }
781
782    /// Get current memory safety status snapshot
783    pub async fn get_safety_status(&self) -> MemorySafetyStatus {
784        let allocation_tracker = self.allocation_tracker.read().await;
785        let reference_tracker = self.reference_tracker.read().await;
786        let buffer_monitor = self.buffer_safety_monitor.read().await;
787
788        MemorySafetyStatus {
789            current_memory_usage: allocation_tracker
790                .current_memory_usage
791                .load(Ordering::Relaxed),
792            active_allocations: allocation_tracker.active_allocations.len(),
793            detected_leaks: allocation_tracker.detected_leaks.len(),
794            active_references: reference_tracker.strong_references.len(),
795            detected_cycles: reference_tracker.detected_cycles.len(),
796            bounds_violations: buffer_monitor.bounds_violations.len(),
797            unsafe_operations: buffer_monitor.unsafe_operations.len(),
798            last_audit: Instant::now(), // This would be stored properly in a real implementation
799        }
800    }
801}
802
803/// Complete memory safety audit report
804#[derive(Debug)]
805pub struct MemorySafetyReport {
806    /// Results from memory allocation audit
807    pub allocation_audit: Option<AllocationAuditResult>,
808    /// Results from reference cycle audit
809    pub reference_audit: Option<ReferenceAuditResult>,
810    /// Results from buffer safety audit
811    pub buffer_audit: Option<BufferAuditResult>,
812    /// Overall safety score from 0.0 to 100.0
813    pub overall_safety_score: f64,
814    /// Timestamp when audit was performed
815    pub audit_timestamp: Instant,
816}
817
818impl Default for MemorySafetyReport {
819    fn default() -> Self {
820        Self {
821            allocation_audit: None,
822            reference_audit: None,
823            buffer_audit: None,
824            overall_safety_score: 0.0,
825            audit_timestamp: Instant::now(),
826        }
827    }
828}
829
830/// Result of allocation audit
831#[derive(Debug, Default)]
832pub struct AllocationAuditResult {
833    /// List of detected memory leaks
834    pub detected_leaks: Vec<MemoryLeak>,
835    /// Number of currently active allocations
836    pub total_active_allocations: usize,
837    /// Current total memory usage in bytes
838    pub current_memory_usage: u64,
839    /// Peak memory usage observed in bytes
840    pub peak_memory_usage: u64,
841    /// Identified allocation patterns
842    pub allocation_patterns: HashMap<String, AllocationPattern>,
843    /// Whether memory usage exceeded configured threshold
844    pub memory_threshold_exceeded: bool,
845}
846
847/// Result of reference audit
848#[derive(Debug, Default)]
849pub struct ReferenceAuditResult {
850    /// List of detected reference cycles
851    pub detected_cycles: Vec<ReferenceCycle>,
852    /// Number of active strong references
853    pub active_strong_references: usize,
854    /// Number of active weak references
855    pub active_weak_references: usize,
856    /// References that appear to be orphaned
857    pub orphaned_references: Vec<ReferenceInfo>,
858    /// Identified reference usage patterns
859    pub reference_patterns: HashMap<String, ReferencePattern>,
860}
861
862/// Result of buffer audit
863#[derive(Debug, Default)]
864pub struct BufferAuditResult {
865    /// List of detected bounds violations
866    pub bounds_violations: Vec<BoundsViolation>,
867    /// List of detected unsafe operations
868    pub unsafe_operations: Vec<UnsafeOperation>,
869    /// Statistics for all tracked buffers
870    pub buffer_statistics: HashMap<String, BufferStats>,
871    /// Buffers that have existed for unusually long time
872    pub long_lived_buffers: Vec<BufferLifecycle>,
873}
874
875/// Current memory safety status
876#[derive(Debug)]
877pub struct MemorySafetyStatus {
878    /// Current total memory usage in bytes
879    pub current_memory_usage: u64,
880    /// Number of currently active allocations
881    pub active_allocations: usize,
882    /// Number of detected memory leaks
883    pub detected_leaks: usize,
884    /// Number of active references being tracked
885    pub active_references: usize,
886    /// Number of detected reference cycles
887    pub detected_cycles: usize,
888    /// Number of buffer bounds violations
889    pub bounds_violations: usize,
890    /// Number of detected unsafe operations
891    pub unsafe_operations: usize,
892    /// Timestamp of last audit performed
893    pub last_audit: Instant,
894}
895
896/// Thread-safe model manager for voice conversion
897pub struct ThreadSafeModelManager {
898    /// Cached models with thread-safe access
899    models: Arc<RwLock<HashMap<ConversionType, Arc<ConversionModel>>>>,
900    /// Model loading semaphore to prevent resource exhaustion
901    loading_semaphore: Arc<Semaphore>,
902    /// Model access statistics
903    stats: Arc<RwLock<ModelAccessStats>>,
904    /// Maximum number of cached models
905    max_cached_models: usize,
906    /// Model usage tracking for eviction decisions
907    usage_tracker: Arc<RwLock<HashMap<ConversionType, ModelUsageInfo>>>,
908}
909
910/// Model access statistics for monitoring
911#[derive(Debug, Clone, Default)]
912pub struct ModelAccessStats {
913    /// Number of times a model was found in cache
914    pub cache_hits: u64,
915    /// Number of times a model was not found in cache
916    pub cache_misses: u64,
917    /// Total number of models loaded from storage
918    pub models_loaded: u64,
919    /// Number of models removed from cache
920    pub models_evicted: u64,
921    /// Number of concurrent model loading operations
922    pub concurrent_loads: u64,
923    /// Average time taken to load a model
924    pub average_load_time: Duration,
925    /// Timestamp of last cache cleanup operation
926    pub last_cleanup: Option<Instant>,
927}
928
929/// Model usage information for cache management
930#[derive(Debug, Clone)]
931pub struct ModelUsageInfo {
932    /// When the model was last accessed
933    pub last_accessed: Instant,
934    /// Total number of times this model has been accessed
935    pub access_count: u32,
936    /// Cumulative processing time for all operations
937    pub total_processing_time: Duration,
938    /// Average processing time per operation
939    pub average_processing_time: Duration,
940    /// Estimated memory usage in bytes
941    pub memory_usage_estimate: u64,
942}
943
944impl ThreadSafeModelManager {
945    /// Create new thread-safe model manager
946    pub fn new(max_cached_models: usize) -> Self {
947        Self {
948            models: Arc::new(RwLock::new(HashMap::new())),
949            loading_semaphore: Arc::new(Semaphore::new(2)), // Max 2 concurrent model loads
950            stats: Arc::new(RwLock::new(ModelAccessStats::default())),
951            max_cached_models,
952            usage_tracker: Arc::new(RwLock::new(HashMap::new())),
953        }
954    }
955
956    /// Get model with thread-safe access and cache management
957    pub async fn get_model(
958        &self,
959        conversion_type: &ConversionType,
960    ) -> Result<Option<Arc<ConversionModel>>> {
961        // Try to get from cache first
962        {
963            let models_guard = self.models.read().await;
964            if let Some(model) = models_guard.get(conversion_type) {
965                // Update access statistics
966                self.update_access_stats(conversion_type, true).await;
967                return Ok(Some(Arc::clone(model)));
968            }
969        }
970
971        // Model not in cache, need to load it
972        self.update_access_stats(conversion_type, false).await;
973
974        // Use semaphore to limit concurrent loading
975        let _permit = self
976            .loading_semaphore
977            .acquire()
978            .await
979            .map_err(|e| Error::runtime(format!("Failed to acquire loading permit: {}", e)))?;
980
981        // Double-check pattern: another thread might have loaded it while we waited
982        {
983            let models_guard = self.models.read().await;
984            if let Some(model) = models_guard.get(conversion_type) {
985                self.update_access_stats(conversion_type, true).await;
986                return Ok(Some(Arc::clone(model)));
987            }
988        }
989
990        // Load the model (this would be implemented based on specific model loading logic)
991        debug!("Loading model for conversion type: {:?}", conversion_type);
992        let start_time = Instant::now();
993
994        // Simulate model loading - in real implementation this would load actual models
995        let model = self.load_model_impl(conversion_type).await?;
996
997        let load_time = start_time.elapsed();
998
999        // Update cache with new model
1000        {
1001            let mut models_guard = self.models.write().await;
1002            let mut usage_guard = self.usage_tracker.write().await;
1003
1004            // Check if we need to evict old models
1005            if models_guard.len() >= self.max_cached_models {
1006                self.evict_least_used_model(&mut models_guard, &mut usage_guard)
1007                    .await;
1008            }
1009
1010            // Insert new model
1011            let model_arc = Arc::new(model);
1012            models_guard.insert(conversion_type.clone(), Arc::clone(&model_arc));
1013
1014            // Track usage
1015            usage_guard.insert(
1016                conversion_type.clone(),
1017                ModelUsageInfo {
1018                    last_accessed: Instant::now(),
1019                    access_count: 1,
1020                    total_processing_time: Duration::from_millis(0),
1021                    average_processing_time: Duration::from_millis(0),
1022                    memory_usage_estimate: 100 * 1024 * 1024, // 100MB estimate
1023                },
1024            );
1025
1026            // Update load statistics
1027            {
1028                let mut stats_guard = self.stats.write().await;
1029                stats_guard.models_loaded += 1;
1030                stats_guard.concurrent_loads += 1;
1031                stats_guard.average_load_time = if stats_guard.models_loaded == 1 {
1032                    load_time
1033                } else {
1034                    Duration::from_nanos(
1035                        (stats_guard.average_load_time.as_nanos() as u64
1036                            * (stats_guard.models_loaded - 1)
1037                            + load_time.as_nanos() as u64)
1038                            / stats_guard.models_loaded,
1039                    )
1040                };
1041            }
1042
1043            Ok(Some(model_arc))
1044        }
1045    }
1046
1047    /// Load model implementation for given conversion type with simulated loading time
1048    async fn load_model_impl(&self, conversion_type: &ConversionType) -> Result<ConversionModel> {
1049        // This is a placeholder - in real implementation, this would load the actual model
1050        tokio::time::sleep(Duration::from_millis(100)).await; // Simulate loading time
1051
1052        let model_type = match conversion_type {
1053            ConversionType::SpeakerConversion => crate::models::ModelType::NeuralVC,
1054            ConversionType::AgeTransformation => crate::models::ModelType::NeuralVC,
1055            ConversionType::GenderTransformation => crate::models::ModelType::NeuralVC,
1056            ConversionType::VoiceMorphing => crate::models::ModelType::AutoVC,
1057            ConversionType::EmotionalTransformation => crate::models::ModelType::Transformer,
1058            _ => crate::models::ModelType::Custom,
1059        };
1060
1061        Ok(ConversionModel::new(model_type))
1062    }
1063
1064    /// Evict least recently used model from cache to make space for new model
1065    async fn evict_least_used_model(
1066        &self,
1067        models_guard: &mut HashMap<ConversionType, Arc<ConversionModel>>,
1068        usage_guard: &mut HashMap<ConversionType, ModelUsageInfo>,
1069    ) {
1070        if let Some((least_used_type, _)) = usage_guard
1071            .iter()
1072            .min_by_key(|(_, usage)| (usage.last_accessed, usage.access_count))
1073        {
1074            let evicted_type = least_used_type.clone();
1075            models_guard.remove(&evicted_type);
1076            usage_guard.remove(&evicted_type);
1077
1078            // Update statistics
1079            {
1080                let mut stats_guard = self.stats.write().await;
1081                stats_guard.models_evicted += 1;
1082            }
1083
1084            debug!("Evicted least used model: {:?}", evicted_type);
1085        }
1086    }
1087
1088    /// Update access statistics and usage tracker after model access
1089    async fn update_access_stats(&self, conversion_type: &ConversionType, cache_hit: bool) {
1090        let mut stats_guard = self.stats.write().await;
1091        if cache_hit {
1092            stats_guard.cache_hits += 1;
1093        } else {
1094            stats_guard.cache_misses += 1;
1095        }
1096
1097        // Update usage tracker
1098        drop(stats_guard);
1099        let mut usage_guard = self.usage_tracker.write().await;
1100        if let Some(usage_info) = usage_guard.get_mut(conversion_type) {
1101            usage_info.last_accessed = Instant::now();
1102            usage_info.access_count += 1;
1103        }
1104    }
1105
1106    /// Get current model access statistics snapshot
1107    pub async fn get_stats(&self) -> ModelAccessStats {
1108        self.stats.read().await.clone()
1109    }
1110
1111    /// Clear all cached models from memory and update statistics
1112    pub async fn clear_cache(&self) {
1113        let mut models_guard = self.models.write().await;
1114        let mut usage_guard = self.usage_tracker.write().await;
1115
1116        let evicted_count = models_guard.len();
1117        models_guard.clear();
1118        usage_guard.clear();
1119
1120        // Update statistics
1121        {
1122            let mut stats_guard = self.stats.write().await;
1123            stats_guard.models_evicted += evicted_count as u64;
1124        }
1125
1126        info!(
1127            "Cleared all cached models: {} models evicted",
1128            evicted_count
1129        );
1130    }
1131
1132    /// Perform periodic cleanup of models that have been idle beyond specified duration
1133    pub async fn cleanup_unused_models(&self, max_idle_time: Duration) {
1134        let now = Instant::now();
1135        let mut models_guard = self.models.write().await;
1136        let mut usage_guard = self.usage_tracker.write().await;
1137
1138        let mut to_remove = Vec::new();
1139        for (conversion_type, usage_info) in usage_guard.iter() {
1140            if now.duration_since(usage_info.last_accessed) > max_idle_time {
1141                to_remove.push(conversion_type.clone());
1142            }
1143        }
1144
1145        let mut evicted_count = 0;
1146        for conversion_type in to_remove {
1147            models_guard.remove(&conversion_type);
1148            usage_guard.remove(&conversion_type);
1149            evicted_count += 1;
1150            debug!("Evicted idle model: {:?}", conversion_type);
1151        }
1152
1153        if evicted_count > 0 {
1154            // Update statistics
1155            {
1156                let mut stats_guard = self.stats.write().await;
1157                stats_guard.models_evicted += evicted_count;
1158                stats_guard.last_cleanup = Some(now);
1159            }
1160
1161            info!("Cleanup evicted {} idle models", evicted_count);
1162        }
1163    }
1164}
1165
1166/// Thread-safe conversion operation guard
1167pub struct OperationGuard {
1168    /// Shared operation state
1169    operation_state: Arc<RwLock<OperationState>>,
1170    /// Semaphore permit for this operation
1171    _permit: OwnedSemaphorePermit,
1172    /// Operation identifier
1173    operation_id: String,
1174    /// Start time for performance tracking
1175    start_time: Instant,
1176}
1177
1178/// Operation state tracking
1179#[derive(Debug, Default, Clone)]
1180pub struct OperationState {
1181    /// Currently active operations mapped by operation ID
1182    pub active_operations: HashMap<String, OperationInfo>,
1183    /// Total number of completed operations
1184    pub completed_operations: u64,
1185    /// Total number of failed operations
1186    pub failed_operations: u64,
1187    /// Average duration of completed operations
1188    pub average_duration: Duration,
1189}
1190
1191/// Information about an active operation
1192#[derive(Debug, Clone)]
1193pub struct OperationInfo {
1194    /// Unique identifier for this operation
1195    pub operation_id: String,
1196    /// Type of conversion being performed
1197    pub conversion_type: ConversionType,
1198    /// When the operation started
1199    pub start_time: Instant,
1200    /// ID of the thread handling this operation
1201    pub thread_id: std::thread::ThreadId,
1202    /// Current status of the operation
1203    pub status: OperationStatus,
1204}
1205
1206/// Operation status
1207#[derive(Debug, Clone, PartialEq)]
1208pub enum OperationStatus {
1209    /// Operation is being initialized and preparing resources
1210    Starting,
1211    /// Operation is actively processing data
1212    Processing,
1213    /// Operation is in final cleanup and finalization phase
1214    Finalizing,
1215    /// Operation completed successfully
1216    Completed,
1217    /// Operation failed with error message describing the failure
1218    Failed(String),
1219}
1220
1221impl OperationGuard {
1222    /// Create new operation guard with semaphore-based concurrency control
1223    pub async fn new(
1224        operation_state: Arc<RwLock<OperationState>>,
1225        semaphore: Arc<Semaphore>,
1226        operation_id: String,
1227        conversion_type: ConversionType,
1228    ) -> Result<Self> {
1229        let permit = semaphore
1230            .acquire_owned()
1231            .await
1232            .map_err(|e| Error::runtime(format!("Failed to acquire operation permit: {}", e)))?;
1233
1234        let start_time = Instant::now();
1235
1236        // Register operation
1237        {
1238            let mut state_guard = operation_state.write().await;
1239            state_guard.active_operations.insert(
1240                operation_id.clone(),
1241                OperationInfo {
1242                    operation_id: operation_id.clone(),
1243                    conversion_type,
1244                    start_time,
1245                    thread_id: std::thread::current().id(),
1246                    status: OperationStatus::Starting,
1247                },
1248            );
1249        }
1250
1251        Ok(Self {
1252            operation_state,
1253            _permit: permit,
1254            operation_id,
1255            start_time,
1256        })
1257    }
1258
1259    /// Update operation status in shared state for monitoring
1260    pub async fn update_status(&self, status: OperationStatus) {
1261        let mut state_guard = self.operation_state.write().await;
1262        if let Some(op_info) = state_guard.active_operations.get_mut(&self.operation_id) {
1263            op_info.status = status;
1264        }
1265    }
1266
1267    /// Mark operation as successfully completed and update statistics
1268    pub async fn complete(&self) {
1269        self.finalize_operation(OperationStatus::Completed).await;
1270    }
1271
1272    /// Mark operation as failed with error message and update statistics
1273    pub async fn fail(&self, error: String) {
1274        self.finalize_operation(OperationStatus::Failed(error))
1275            .await;
1276    }
1277
1278    /// Finalize operation with given status and update duration statistics
1279    async fn finalize_operation(&self, final_status: OperationStatus) {
1280        let duration = self.start_time.elapsed();
1281        let mut state_guard = self.operation_state.write().await;
1282
1283        // Remove from active operations
1284        state_guard.active_operations.remove(&self.operation_id);
1285
1286        // Update statistics
1287        match final_status {
1288            OperationStatus::Completed => {
1289                state_guard.completed_operations += 1;
1290
1291                // Update average duration
1292                let total_ops = state_guard.completed_operations;
1293                if total_ops == 1 {
1294                    state_guard.average_duration = duration;
1295                } else {
1296                    let total_nanos = state_guard.average_duration.as_nanos() as u64
1297                        * (total_ops - 1)
1298                        + duration.as_nanos() as u64;
1299                    state_guard.average_duration = Duration::from_nanos(total_nanos / total_ops);
1300                }
1301            }
1302            OperationStatus::Failed(_) => {
1303                state_guard.failed_operations += 1;
1304            }
1305            _ => {}
1306        }
1307
1308        debug!(
1309            "Operation {} finalized with status {:?} in {:?}",
1310            self.operation_id, final_status, duration
1311        );
1312    }
1313}
1314
1315impl Drop for OperationGuard {
1316    fn drop(&mut self) {
1317        // Ensure operation is removed from active operations on drop
1318        let operation_state = Arc::clone(&self.operation_state);
1319        let operation_id = self.operation_id.clone();
1320
1321        tokio::spawn(async move {
1322            let mut state_guard = operation_state.write().await;
1323            state_guard.active_operations.remove(&operation_id);
1324        });
1325    }
1326}
1327
1328/// Thread-safe concurrent conversion manager
1329pub struct ConcurrentConversionManager {
1330    /// Shared operation state
1331    operation_state: Arc<RwLock<OperationState>>,
1332    /// Semaphore for limiting concurrent operations
1333    operation_semaphore: Arc<Semaphore>,
1334    /// Model manager for thread-safe model access
1335    model_manager: Arc<ThreadSafeModelManager>,
1336    /// Configuration with thread-safe access
1337    config: Arc<RwLock<ConversionConfig>>,
1338    /// Performance metrics
1339    metrics: Arc<RwLock<ConcurrentConversionMetrics>>,
1340}
1341
1342/// Metrics for concurrent conversion operations
1343#[derive(Debug, Default, Clone)]
1344pub struct ConcurrentConversionMetrics {
1345    /// Total number of conversion requests received
1346    pub total_requests: u64,
1347    /// Number of conversions that completed successfully
1348    pub successful_conversions: u64,
1349    /// Number of conversions that failed
1350    pub failed_conversions: u64,
1351    /// Average time spent waiting in queue
1352    pub average_queue_time: Duration,
1353    /// Average time spent processing requests
1354    pub average_processing_time: Duration,
1355    /// Maximum number of concurrent operations seen
1356    pub peak_concurrent_operations: usize,
1357    /// Current number of active operations
1358    pub current_concurrent_operations: usize,
1359}
1360
1361impl ConcurrentConversionManager {
1362    /// Create new concurrent conversion manager with specified concurrency limits
1363    pub fn new(
1364        max_concurrent_operations: usize,
1365        max_cached_models: usize,
1366        config: ConversionConfig,
1367    ) -> Self {
1368        Self {
1369            operation_state: Arc::new(RwLock::new(OperationState::default())),
1370            operation_semaphore: Arc::new(Semaphore::new(max_concurrent_operations)),
1371            model_manager: Arc::new(ThreadSafeModelManager::new(max_cached_models)),
1372            config: Arc::new(RwLock::new(config)),
1373            metrics: Arc::new(RwLock::new(ConcurrentConversionMetrics::default())),
1374        }
1375    }
1376
1377    /// Process conversion request with thread-safe concurrency control and metrics tracking
1378    pub async fn convert_with_concurrency_control(
1379        &self,
1380        request: ConversionRequest,
1381    ) -> Result<ConversionResult> {
1382        let queue_start = Instant::now();
1383
1384        // Update metrics
1385        {
1386            let mut metrics_guard = self.metrics.write().await;
1387            metrics_guard.total_requests += 1;
1388        }
1389
1390        // Create operation guard for this conversion
1391        let operation_guard = OperationGuard::new(
1392            Arc::clone(&self.operation_state),
1393            Arc::clone(&self.operation_semaphore),
1394            request.id.clone(),
1395            request.conversion_type.clone(),
1396        )
1397        .await?;
1398
1399        let queue_time = queue_start.elapsed();
1400
1401        // Update queue time metrics
1402        {
1403            let mut metrics_guard = self.metrics.write().await;
1404            let total_requests = metrics_guard.total_requests;
1405            let current_avg = metrics_guard.average_queue_time;
1406
1407            metrics_guard.average_queue_time = if total_requests == 1 {
1408                queue_time
1409            } else {
1410                Duration::from_nanos(
1411                    (current_avg.as_nanos() as u64 * (total_requests - 1)
1412                        + queue_time.as_nanos() as u64)
1413                        / total_requests,
1414                )
1415            };
1416
1417            metrics_guard.current_concurrent_operations += 1;
1418            if metrics_guard.current_concurrent_operations
1419                > metrics_guard.peak_concurrent_operations
1420            {
1421                metrics_guard.peak_concurrent_operations =
1422                    metrics_guard.current_concurrent_operations;
1423            }
1424        }
1425
1426        operation_guard
1427            .update_status(OperationStatus::Processing)
1428            .await;
1429
1430        // Perform the actual conversion
1431        let conversion_result = match self
1432            .perform_safe_conversion(&request, &operation_guard)
1433            .await
1434        {
1435            Ok(result) => {
1436                operation_guard.complete().await;
1437
1438                // Update success metrics
1439                {
1440                    let mut metrics_guard = self.metrics.write().await;
1441                    metrics_guard.successful_conversions += 1;
1442                    metrics_guard.current_concurrent_operations -= 1;
1443                }
1444
1445                Ok(result)
1446            }
1447            Err(e) => {
1448                operation_guard.fail(e.to_string()).await;
1449
1450                // Update failure metrics
1451                {
1452                    let mut metrics_guard = self.metrics.write().await;
1453                    metrics_guard.failed_conversions += 1;
1454                    metrics_guard.current_concurrent_operations -= 1;
1455                }
1456
1457                Err(e)
1458            }
1459        };
1460
1461        conversion_result
1462    }
1463
1464    /// Perform the actual conversion with thread safety guarantees and model management
1465    async fn perform_safe_conversion(
1466        &self,
1467        request: &ConversionRequest,
1468        operation_guard: &OperationGuard,
1469    ) -> Result<ConversionResult> {
1470        let processing_start = Instant::now();
1471
1472        // Get model safely
1473        let model = self
1474            .model_manager
1475            .get_model(&request.conversion_type)
1476            .await?;
1477
1478        operation_guard
1479            .update_status(OperationStatus::Finalizing)
1480            .await;
1481
1482        // Perform conversion (this would use the actual conversion logic)
1483        let converted_audio = self.simulate_conversion(&request.source_audio).await?;
1484
1485        let processing_time = processing_start.elapsed();
1486
1487        // Update processing time metrics
1488        {
1489            let mut metrics_guard = self.metrics.write().await;
1490            let successful_conversions = metrics_guard.successful_conversions + 1; // +1 because we haven't incremented yet
1491            let current_avg = metrics_guard.average_processing_time;
1492
1493            metrics_guard.average_processing_time = if successful_conversions == 1 {
1494                processing_time
1495            } else {
1496                Duration::from_nanos(
1497                    (current_avg.as_nanos() as u64 * (successful_conversions - 1)
1498                        + processing_time.as_nanos() as u64)
1499                        / successful_conversions,
1500                )
1501            };
1502        }
1503
1504        // Create successful result
1505        Ok(ConversionResult {
1506            request_id: request.id.clone(),
1507            converted_audio,
1508            output_sample_rate: 22050, // Default sample rate
1509            quality_metrics: HashMap::new(),
1510            artifacts: None,
1511            objective_quality: None,
1512            processing_time,
1513            conversion_type: request.conversion_type.clone(),
1514            success: true,
1515            error_message: None,
1516            timestamp: std::time::SystemTime::now(),
1517        })
1518    }
1519
1520    /// Simulate conversion processing for testing and development purposes
1521    async fn simulate_conversion(&self, source_audio: &[f32]) -> Result<Vec<f32>> {
1522        // Simulate processing time
1523        tokio::time::sleep(Duration::from_millis(10)).await;
1524
1525        // Return processed audio (simplified)
1526        let mut result = source_audio.to_vec();
1527        for sample in &mut result {
1528            *sample *= 0.9; // Simple processing simulation
1529        }
1530
1531        Ok(result)
1532    }
1533
1534    /// Get current conversion metrics snapshot
1535    pub async fn get_metrics(&self) -> ConcurrentConversionMetrics {
1536        let metrics_guard = self.metrics.read().await;
1537        metrics_guard.clone()
1538    }
1539
1540    /// Get current operation state snapshot with active operations
1541    pub async fn get_operation_state(&self) -> OperationState {
1542        let state_guard = self.operation_state.read().await;
1543        state_guard.clone()
1544    }
1545
1546    /// Update configuration with thread-safe write lock
1547    pub async fn update_config(&self, new_config: ConversionConfig) -> Result<()> {
1548        let mut config_guard = self.config.write().await;
1549        *config_guard = new_config;
1550        info!("Configuration updated successfully");
1551        Ok(())
1552    }
1553
1554    /// Get current configuration snapshot with thread-safe read lock
1555    pub async fn get_config(&self) -> ConversionConfig {
1556        self.config.read().await.clone()
1557    }
1558
1559    /// Perform health check and return status metrics for monitoring
1560    pub async fn health_check(&self) -> HashMap<String, String> {
1561        let mut health = HashMap::new();
1562
1563        let metrics = self.get_metrics().await;
1564        let operation_state = self.get_operation_state().await;
1565        let model_stats = self.model_manager.get_stats().await;
1566
1567        health.insert("status".to_string(), "healthy".to_string());
1568        health.insert(
1569            "total_requests".to_string(),
1570            metrics.total_requests.to_string(),
1571        );
1572        health.insert(
1573            "success_rate".to_string(),
1574            format!(
1575                "{:.2}%",
1576                if metrics.total_requests > 0 {
1577                    (metrics.successful_conversions as f64 / metrics.total_requests as f64) * 100.0
1578                } else {
1579                    100.0
1580                }
1581            ),
1582        );
1583        health.insert(
1584            "active_operations".to_string(),
1585            operation_state.active_operations.len().to_string(),
1586        );
1587        health.insert(
1588            "cached_models".to_string(),
1589            format!(
1590                "{}/{}",
1591                model_stats.cache_hits + model_stats.cache_misses - model_stats.models_evicted,
1592                self.model_manager.max_cached_models
1593            ),
1594        );
1595        health.insert(
1596            "model_cache_hit_rate".to_string(),
1597            format!(
1598                "{:.2}%",
1599                if model_stats.cache_hits + model_stats.cache_misses > 0 {
1600                    (model_stats.cache_hits as f64
1601                        / (model_stats.cache_hits + model_stats.cache_misses) as f64)
1602                        * 100.0
1603                } else {
1604                    0.0
1605                }
1606            ),
1607        );
1608
1609        health
1610    }
1611
1612    /// Gracefully shutdown manager waiting for active operations to complete
1613    pub async fn shutdown(&self) -> Result<()> {
1614        info!("Starting graceful shutdown of concurrent conversion manager");
1615
1616        // Wait for all active operations to complete (with timeout)
1617        let shutdown_timeout = Duration::from_secs(30);
1618        let start_time = Instant::now();
1619
1620        while start_time.elapsed() < shutdown_timeout {
1621            let operation_state = self.operation_state.read().await;
1622            if operation_state.active_operations.is_empty() {
1623                break;
1624            }
1625            drop(operation_state);
1626
1627            debug!(
1628                "Waiting for {} active operations to complete",
1629                self.operation_state.read().await.active_operations.len()
1630            );
1631            tokio::time::sleep(Duration::from_millis(100)).await;
1632        }
1633
1634        // Clear model cache
1635        self.model_manager.clear_cache().await;
1636
1637        let final_metrics = self.get_metrics().await;
1638        info!(
1639            "Concurrent conversion manager shutdown complete. Final stats: {} total requests, {} successful, {} failed",
1640            final_metrics.total_requests, final_metrics.successful_conversions, final_metrics.failed_conversions
1641        );
1642
1643        Ok(())
1644    }
1645}
1646
1647#[cfg(test)]
1648mod tests {
1649    use super::*;
1650    use crate::types::{ConversionTarget, VoiceCharacteristics};
1651
1652    #[tokio::test]
1653    async fn test_thread_safe_model_manager() {
1654        let manager = ThreadSafeModelManager::new(3);
1655
1656        // Test cache miss and load
1657        let model = manager
1658            .get_model(&ConversionType::PitchShift)
1659            .await
1660            .unwrap();
1661        assert!(model.is_some());
1662
1663        // Test cache hit
1664        let model2 = manager
1665            .get_model(&ConversionType::PitchShift)
1666            .await
1667            .unwrap();
1668        assert!(model2.is_some());
1669
1670        // Verify statistics
1671        let stats = manager.get_stats().await;
1672        assert_eq!(stats.cache_hits, 1);
1673        assert_eq!(stats.cache_misses, 1);
1674        assert_eq!(stats.models_loaded, 1);
1675    }
1676
1677    #[tokio::test]
1678    async fn test_operation_guard() {
1679        let operation_state = Arc::new(RwLock::new(OperationState::default()));
1680        let semaphore = Arc::new(Semaphore::new(1));
1681
1682        let guard = OperationGuard::new(
1683            Arc::clone(&operation_state),
1684            semaphore,
1685            "test_op".to_string(),
1686            ConversionType::PitchShift,
1687        )
1688        .await
1689        .unwrap();
1690
1691        // Check that operation is registered
1692        {
1693            let state = operation_state.read().await;
1694            assert!(state.active_operations.contains_key("test_op"));
1695        }
1696
1697        // Complete operation
1698        guard.complete().await;
1699
1700        // Check that operation is removed and stats updated
1701        {
1702            let state = operation_state.read().await;
1703            assert!(!state.active_operations.contains_key("test_op"));
1704            assert_eq!(state.completed_operations, 1);
1705        }
1706    }
1707
1708    #[tokio::test]
1709    async fn test_concurrent_conversion_manager() {
1710        let config = ConversionConfig::default();
1711        let manager = ConcurrentConversionManager::new(2, 3, config);
1712
1713        let request = ConversionRequest::new(
1714            "test_request".to_string(),
1715            vec![0.1, -0.1, 0.2, -0.2],
1716            22050,
1717            ConversionType::PitchShift,
1718            ConversionTarget::new(VoiceCharacteristics::default()),
1719        );
1720
1721        let result = manager.convert_with_concurrency_control(request).await;
1722        assert!(result.is_ok());
1723
1724        let metrics = manager.get_metrics().await;
1725        assert_eq!(metrics.total_requests, 1);
1726        assert_eq!(metrics.successful_conversions, 1);
1727        assert_eq!(metrics.failed_conversions, 0);
1728    }
1729
1730    #[tokio::test]
1731    async fn test_concurrent_operations() {
1732        let config = ConversionConfig::default();
1733        let manager = Arc::new(ConcurrentConversionManager::new(3, 2, config));
1734
1735        let mut handles = Vec::new();
1736
1737        // Spawn multiple concurrent requests
1738        for i in 0..5 {
1739            let manager_clone = Arc::clone(&manager);
1740            let handle = tokio::spawn(async move {
1741                let request = ConversionRequest::new(
1742                    format!("test_request_{}", i),
1743                    vec![0.1, -0.1, 0.2, -0.2],
1744                    22050,
1745                    ConversionType::PitchShift,
1746                    ConversionTarget::new(VoiceCharacteristics::default()),
1747                );
1748
1749                manager_clone
1750                    .convert_with_concurrency_control(request)
1751                    .await
1752            });
1753            handles.push(handle);
1754        }
1755
1756        // Wait for all to complete
1757        let mut successful = 0;
1758        for handle in handles {
1759            if handle.await.unwrap().is_ok() {
1760                successful += 1;
1761            }
1762        }
1763
1764        assert_eq!(successful, 5);
1765
1766        let metrics = manager.get_metrics().await;
1767        assert_eq!(metrics.total_requests, 5);
1768        assert_eq!(metrics.successful_conversions, 5);
1769    }
1770
1771    #[tokio::test]
1772    async fn test_model_cache_eviction() {
1773        let manager = ThreadSafeModelManager::new(2); // Small cache size
1774
1775        // Load models to fill cache
1776        manager
1777            .get_model(&ConversionType::PitchShift)
1778            .await
1779            .unwrap();
1780        manager
1781            .get_model(&ConversionType::SpeedTransformation)
1782            .await
1783            .unwrap();
1784
1785        // Load another model, should trigger eviction
1786        manager
1787            .get_model(&ConversionType::GenderTransformation)
1788            .await
1789            .unwrap();
1790
1791        let stats = manager.get_stats().await;
1792        assert_eq!(stats.models_evicted, 1);
1793    }
1794
1795    #[tokio::test]
1796    async fn test_health_check() {
1797        let config = ConversionConfig::default();
1798        let manager = ConcurrentConversionManager::new(2, 3, config);
1799
1800        let health = manager.health_check().await;
1801        assert_eq!(health.get("status"), Some(&"healthy".to_string()));
1802        assert!(health.contains_key("total_requests"));
1803        assert!(health.contains_key("success_rate"));
1804        assert!(health.contains_key("cached_models"));
1805    }
1806}