Skip to main content

memscope_rs/metadata/
thread.rs

1//! Thread Registry - Thread metadata management
2//!
3//! This module provides thread registration and metadata tracking
4//! for the MetadataEngine.
5
6use crate::core::{MemScopeError, MemScopeResult};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::sync::{Arc, Mutex};
10
11/// Thread information
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct ThreadInfo {
14    /// Thread identifier
15    pub thread_id: u64,
16    /// Thread name (if available)
17    pub thread_name: Option<String>,
18    /// When this thread was created
19    pub created_at: u64,
20    /// Number of allocations made by this thread
21    pub allocation_count: usize,
22    /// Total bytes allocated by this thread
23    pub total_allocated: usize,
24    /// Peak memory usage for this thread
25    pub peak_memory: usize,
26    /// Whether this thread is still active
27    pub is_active: bool,
28}
29
30/// Thread Registry - manages thread metadata
31#[derive(Debug)]
32pub struct ThreadRegistry {
33    /// Registered threads
34    threads: Arc<Mutex<HashMap<u64, ThreadInfo>>>,
35    /// Next available internal thread ID
36    next_id: Arc<Mutex<u64>>,
37}
38
39impl ThreadRegistry {
40    /// Create a new ThreadRegistry
41    pub fn new() -> Self {
42        Self {
43            threads: Arc::new(Mutex::new(HashMap::new())),
44            next_id: Arc::new(Mutex::new(1)),
45        }
46    }
47
48    pub fn next_id(&self) -> MemScopeResult<u64> {
49        let mut id = self.next_id.lock().map_err(|e| {
50            MemScopeError::system(
51                crate::core::error::SystemErrorType::Locking,
52                format!("Failed to acquire next_id lock: {}", e),
53            )
54        })?;
55        let current = *id;
56        *id = current.saturating_add(1);
57        Ok(current)
58    }
59
60    /// Register the current thread
61    pub fn register_current_thread(&self) -> MemScopeResult<u64> {
62        let thread_id = std::thread::current().id();
63        let hash = self.hash_thread_id(&thread_id);
64        let timestamp = std::time::SystemTime::now()
65            .duration_since(std::time::UNIX_EPOCH)
66            .unwrap_or_default()
67            .as_nanos() as u64;
68
69        let mut threads = self.threads.lock().map_err(|e| {
70            MemScopeError::system(
71                crate::core::error::SystemErrorType::Locking,
72                format!("Failed to acquire threads lock: {}", e),
73            )
74        })?;
75        threads.entry(hash).or_insert_with(|| ThreadInfo {
76            thread_id: hash,
77            thread_name: Some(format!("{:?}", thread_id)),
78            created_at: timestamp,
79            allocation_count: 0,
80            total_allocated: 0,
81            peak_memory: 0,
82            is_active: true,
83        });
84        Ok(hash)
85    }
86
87    /// Get thread info by hash
88    pub fn get_thread_info(&self, hash: u64) -> MemScopeResult<Option<ThreadInfo>> {
89        let threads = self.threads.lock().map_err(|e| {
90            MemScopeError::system(
91                crate::core::error::SystemErrorType::Locking,
92                format!("Failed to acquire threads lock: {}", e),
93            )
94        })?;
95        Ok(threads.get(&hash).cloned())
96    }
97
98    /// Record an allocation for a thread
99    pub fn record_allocation(&self, hash: u64, size: usize) -> MemScopeResult<()> {
100        let mut threads = self.threads.lock().map_err(|e| {
101            MemScopeError::system(
102                crate::core::error::SystemErrorType::Locking,
103                format!("Failed to acquire threads lock: {}", e),
104            )
105        })?;
106        if let Some(info) = threads.get_mut(&hash) {
107            info.allocation_count += 1;
108            info.total_allocated += size;
109        }
110        Ok(())
111    }
112
113    /// Update peak memory for a thread
114    pub fn update_peak_memory(&self, hash: u64, current_memory: usize) -> MemScopeResult<()> {
115        let mut threads = self.threads.lock().map_err(|e| {
116            MemScopeError::system(
117                crate::core::error::SystemErrorType::Locking,
118                format!("Failed to acquire threads lock: {}", e),
119            )
120        })?;
121        if let Some(info) = threads.get_mut(&hash) {
122            if current_memory > info.peak_memory {
123                info.peak_memory = current_memory;
124            }
125        }
126        Ok(())
127    }
128
129    /// Mark a thread as inactive
130    pub fn mark_thread_inactive(&self, hash: u64) -> MemScopeResult<()> {
131        let mut threads = self.threads.lock().map_err(|e| {
132            MemScopeError::system(
133                crate::core::error::SystemErrorType::Locking,
134                format!("Failed to acquire threads lock: {}", e),
135            )
136        })?;
137        if let Some(info) = threads.get_mut(&hash) {
138            info.is_active = false;
139        }
140        Ok(())
141    }
142
143    /// Get all threads
144    pub fn get_all_threads(&self) -> MemScopeResult<Vec<ThreadInfo>> {
145        let threads = self.threads.lock().map_err(|e| {
146            MemScopeError::system(
147                crate::core::error::SystemErrorType::Locking,
148                format!("Failed to acquire threads lock: {}", e),
149            )
150        })?;
151        Ok(threads.values().cloned().collect())
152    }
153
154    /// Get active threads only
155    pub fn get_active_threads(&self) -> MemScopeResult<Vec<ThreadInfo>> {
156        let threads = self.threads.lock().map_err(|e| {
157            MemScopeError::system(
158                crate::core::error::SystemErrorType::Locking,
159                format!("Failed to acquire threads lock: {}", e),
160            )
161        })?;
162        Ok(threads.values().filter(|t| t.is_active).cloned().collect())
163    }
164
165    /// Get the number of registered threads
166    pub fn len(&self) -> MemScopeResult<usize> {
167        let threads = self.threads.lock().map_err(|e| {
168            MemScopeError::system(
169                crate::core::error::SystemErrorType::Locking,
170                format!("Failed to acquire threads lock: {}", e),
171            )
172        })?;
173        Ok(threads.len())
174    }
175
176    /// Check if the registry is empty
177    pub fn is_empty(&self) -> MemScopeResult<bool> {
178        Ok(self.len()? == 0)
179    }
180
181    /// Hash a thread ID to a u64
182    fn hash_thread_id(&self, thread_id: &std::thread::ThreadId) -> u64 {
183        crate::utils::thread_id_to_u64(*thread_id)
184    }
185}
186
187impl Default for ThreadRegistry {
188    fn default() -> Self {
189        Self::new()
190    }
191}
192
193#[cfg(test)]
194mod tests {
195    use super::*;
196
197    #[test]
198    fn test_thread_registry_creation() {
199        let registry = ThreadRegistry::new();
200        assert!(registry.is_empty().unwrap());
201    }
202
203    #[test]
204    fn test_register_current_thread() {
205        let registry = ThreadRegistry::new();
206        let hash = registry.register_current_thread().unwrap();
207        assert!(hash > 0);
208        assert_eq!(registry.len().unwrap(), 1);
209
210        let info = registry.get_thread_info(hash).unwrap();
211        assert!(info.is_some());
212        assert!(info.unwrap().is_active);
213    }
214
215    #[test]
216    fn test_record_allocation() {
217        let registry = ThreadRegistry::new();
218        let hash = registry.register_current_thread().unwrap();
219
220        registry.record_allocation(hash, 100).unwrap();
221        registry.record_allocation(hash, 200).unwrap();
222
223        let info = registry.get_thread_info(hash).unwrap().unwrap();
224        assert_eq!(info.allocation_count, 2);
225        assert_eq!(info.total_allocated, 300);
226    }
227
228    #[test]
229    fn test_update_peak_memory() {
230        let registry = ThreadRegistry::new();
231        let hash = registry.register_current_thread().unwrap();
232
233        registry.update_peak_memory(hash, 100).unwrap();
234        registry.update_peak_memory(hash, 200).unwrap();
235        registry.update_peak_memory(hash, 150).unwrap();
236
237        let info = registry.get_thread_info(hash).unwrap().unwrap();
238        assert_eq!(info.peak_memory, 200);
239    }
240
241    #[test]
242    fn test_mark_thread_inactive() {
243        let registry = ThreadRegistry::new();
244        let hash = registry.register_current_thread().unwrap();
245
246        registry.mark_thread_inactive(hash).unwrap();
247
248        let info = registry.get_thread_info(hash).unwrap().unwrap();
249        assert!(!info.is_active);
250    }
251
252    #[test]
253    fn test_get_active_threads() {
254        let registry = ThreadRegistry::new();
255        let hash1 = registry.register_current_thread().unwrap();
256
257        // Simulate another thread by creating a new registry instance
258        let registry2 = ThreadRegistry::new();
259        let _hash2 = registry2.register_current_thread().unwrap();
260
261        registry.mark_thread_inactive(hash1).unwrap();
262
263        // This test only works with the same registry instance
264        // In practice, threads would be managed by a global registry
265    }
266}