Skip to main content

fastskill_core/core/registry/
index_manager.rs

1//! Atomic index update manager with file locking
2
3use crate::core::registry_index::{get_skill_index_path, ScopedSkillName, VersionEntry};
4use crate::core::service::ServiceError;
5use fs2::FileExt;
6use std::fs::{File, OpenOptions};
7use std::io::Write;
8use std::path::PathBuf;
9use std::sync::Mutex;
10use std::time::{Duration, Instant, SystemTime};
11use tracing::{info, warn};
12
13/// Index manager for atomic updates with file locking
14pub struct IndexManager {
15    /// Base path to the registry index directory
16    registry_path: PathBuf,
17    /// Maximum time to wait for file lock acquisition (default: 30 seconds)
18    lock_timeout: Duration,
19    /// Tracked file metadata for external modification detection (interior mutability)
20    file_metadata: Mutex<std::collections::HashMap<PathBuf, IndexFileMetadata>>,
21}
22
23/// Metadata for tracking index file state (for external modification detection)
24#[derive(Debug, Clone)]
25struct IndexFileMetadata {
26    /// File modification time
27    mtime: SystemTime,
28    /// File size in bytes
29    size: u64,
30}
31
32impl IndexManager {
33    /// Create a new IndexManager instance
34    ///
35    /// # Arguments
36    /// * `registry_path` - Base path to the registry index directory
37    ///
38    /// # Returns
39    /// Configured IndexManager instance with default 30-second lock timeout
40    pub fn new(registry_path: PathBuf) -> Self {
41        Self {
42            registry_path,
43            lock_timeout: Duration::from_secs(30),
44            file_metadata: Mutex::new(std::collections::HashMap::new()),
45        }
46    }
47
48    /// Atomically update the index file for a skill version
49    ///
50    /// This method:
51    /// 1. Normalizes the skill_id using ScopedSkillName::normalize()
52    /// 2. Checks for duplicate versions
53    /// 3. Acquires an exclusive file lock with timeout
54    /// 4. Reads existing entries
55    /// 5. Appends new entry
56    /// 6. Writes to temporary file
57    /// 7. Atomically renames temporary file to target
58    /// 8. Releases lock
59    ///
60    /// # Arguments
61    /// * `skill_id` - The skill identifier (may be scoped, e.g., `@org/package`)
62    /// * `version` - The version string (e.g., `1.0.0`)
63    /// * `entry` - The version entry to add to the index
64    ///
65    /// # Returns
66    /// `Ok(())` if successful, `Err(ServiceError)` if operation fails
67    ///
68    /// # Errors
69    /// - Returns `ServiceError::Custom` if duplicate version is detected
70    /// - Returns `ServiceError::Custom` if lock timeout is exceeded
71    /// - Returns `ServiceError::Io` for filesystem errors
72    pub fn atomic_update(
73        &self,
74        skill_id: &str,
75        version: &str,
76        entry: &VersionEntry,
77    ) -> Result<(), ServiceError> {
78        // Step 1: Normalize scoped name
79        let normalized_id = ScopedSkillName::normalize(skill_id);
80        info!("Normalized skill_id '{}' to '{}'", skill_id, normalized_id);
81
82        // Step 2: Get index file path
83        let index_path = get_skill_index_path(&self.registry_path, &normalized_id)?;
84
85        // Ensure parent directory exists
86        if let Some(parent) = index_path.parent() {
87            std::fs::create_dir_all(parent).map_err(ServiceError::Io)?;
88        }
89
90        // Step 3: Check for duplicate version (before acquiring lock)
91        // Read existing entries if file exists
92        let existing_entries_before_lock = if index_path.exists() {
93            Self::read_entries_from_path(&index_path)?
94        } else {
95            Vec::new()
96        };
97
98        // Check if version already exists
99        for existing_entry in &existing_entries_before_lock {
100            if existing_entry.vers == version {
101                return Err(ServiceError::Custom(format!(
102                    "Version {} already exists for skill {}",
103                    version, normalized_id
104                )));
105            }
106        }
107
108        // Step 4: Acquire exclusive file lock with timeout
109        let lock_start = Instant::now();
110        let file = loop {
111            let file = OpenOptions::new()
112                .read(true)
113                .write(true)
114                .create(true)
115                .truncate(true)
116                .open(&index_path)
117                .map_err(ServiceError::Io)?;
118
119            // Try to acquire exclusive lock
120            match file.try_lock_exclusive() {
121                Ok(()) => {
122                    let elapsed = lock_start.elapsed();
123                    if elapsed.as_millis() > 0 {
124                        info!(
125                            "Acquired lock on index file: {:?} (waited {}ms)",
126                            index_path,
127                            elapsed.as_millis()
128                        );
129                    } else {
130                        info!("Acquired lock on index file: {:?}", index_path);
131                    }
132                    break file;
133                }
134                Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
135                    // Lock is held by another process, wait and retry
136                    let elapsed = lock_start.elapsed();
137                    if elapsed >= self.lock_timeout {
138                        warn!(
139                            "Lock timeout exceeded for {:?} after {} seconds",
140                            index_path,
141                            self.lock_timeout.as_secs()
142                        );
143                        return Err(ServiceError::Custom(format!(
144                            "Timeout waiting for file lock on {:?} (exceeded {} seconds)",
145                            index_path,
146                            self.lock_timeout.as_secs()
147                        )));
148                    }
149                    // Log if we've been waiting a while
150                    if elapsed.as_secs() > 0 && elapsed.as_secs().is_multiple_of(5) {
151                        info!(
152                            "Waiting for lock on {:?} ({}s elapsed, timeout: {}s)",
153                            index_path,
154                            elapsed.as_secs(),
155                            self.lock_timeout.as_secs()
156                        );
157                    }
158                    // Wait a bit before retrying
159                    std::thread::sleep(Duration::from_millis(100));
160                    continue;
161                }
162                Err(e) => {
163                    warn!("Failed to acquire lock on {:?}: {}", index_path, e);
164                    return Err(ServiceError::Io(e));
165                }
166            }
167        };
168
169        // Lock will be released when file is dropped
170        // Use a guard to ensure lock is released even on error
171        struct LockGuard(File);
172        impl Drop for LockGuard {
173            fn drop(&mut self) {
174                if let Err(e) = self.0.unlock() {
175                    warn!("Failed to release file lock: {}", e);
176                }
177            }
178        }
179        let _lock_guard = LockGuard(file);
180
181        // Step 5: Check for external modifications (before reading)
182        if index_path.exists() {
183            let file_metadata = self.file_metadata.lock().map_err(|_| {
184                ServiceError::Custom(
185                    "Mutex poisoned - another thread panicked while holding the lock".to_string(),
186                )
187            })?;
188            if let Some(prev_metadata) = file_metadata.get(&index_path) {
189                match std::fs::metadata(&index_path) {
190                    Ok(current_metadata) => {
191                        let current_mtime = current_metadata
192                            .modified()
193                            .unwrap_or(SystemTime::UNIX_EPOCH);
194                        let current_size = current_metadata.len();
195
196                        if current_mtime != prev_metadata.mtime
197                            || current_size != prev_metadata.size
198                        {
199                            warn!(
200                                "External modification detected for {:?}: mtime changed from {:?} to {:?}, size changed from {} to {}",
201                                index_path, prev_metadata.mtime, current_mtime, prev_metadata.size, current_size
202                            );
203                        }
204                    }
205                    Err(e) => {
206                        warn!("Failed to read metadata for {:?}: {}", index_path, e);
207                    }
208                }
209            }
210            drop(file_metadata); // Release lock before reading
211        }
212
213        // Step 5: Read existing entries again (after acquiring lock)
214        let mut existing_entries = if index_path.exists() {
215            Self::read_entries_from_path(&index_path)?
216        } else {
217            Vec::new()
218        };
219
220        // Update tracked metadata after successful read
221        if index_path.exists() {
222            if let Ok(metadata) = std::fs::metadata(&index_path) {
223                if let Ok(mtime) = metadata.modified() {
224                    let mut file_metadata = self.file_metadata.lock().map_err(|_| {
225                        ServiceError::Custom(
226                            "Mutex poisoned - another thread panicked while holding the lock"
227                                .to_string(),
228                        )
229                    })?;
230                    file_metadata.insert(
231                        index_path.clone(),
232                        IndexFileMetadata {
233                            mtime,
234                            size: metadata.len(),
235                        },
236                    );
237                }
238            }
239        }
240
241        // Double-check for duplicate (in case it was added between initial check and lock)
242        for existing_entry in &existing_entries {
243            if existing_entry.vers == version {
244                warn!(
245                    "Duplicate version {} detected for skill {} after acquiring lock",
246                    version, normalized_id
247                );
248                return Err(ServiceError::Custom(format!(
249                    "Version {} already exists for skill {} (detected after lock)",
250                    version, normalized_id
251                )));
252            }
253        }
254
255        info!(
256            "Updating index for {} v{} ({} existing entries)",
257            normalized_id,
258            version,
259            existing_entries.len()
260        );
261
262        // Step 6: Append new entry
263        existing_entries.push(entry.clone());
264
265        // Step 7: Write to temporary file
266        let temp_path = index_path.with_extension("tmp");
267        let mut temp_file = match File::create(&temp_path) {
268            Ok(file) => file,
269            Err(e) => {
270                // Check if error is due to filesystem being full
271                let error_msg = e.to_string().to_lowercase();
272                if error_msg.contains("no space")
273                    || error_msg.contains("filesystem full")
274                    || e.raw_os_error() == Some(28)
275                {
276                    warn!(
277                        "Filesystem full: cannot write index file for {} v{}",
278                        normalized_id, version
279                    );
280                    return Err(ServiceError::Custom(format!(
281                        "Filesystem full: cannot update index for {} v{}. Existing index preserved.",
282                        normalized_id, version
283                    )));
284                }
285                return Err(ServiceError::Io(e));
286            }
287        };
288
289        // Write all entries as newline-delimited JSON
290        for entry in &existing_entries {
291            let line = serde_json::to_string(entry).map_err(|e| {
292                ServiceError::Custom(format!("Failed to serialize index entry: {}", e))
293            })?;
294            if let Err(e) = writeln!(temp_file, "{}", line) {
295                // Check if error is due to filesystem being full
296                let error_msg = e.to_string().to_lowercase();
297                if error_msg.contains("no space")
298                    || error_msg.contains("filesystem full")
299                    || e.raw_os_error() == Some(28)
300                {
301                    warn!(
302                        "Filesystem full: cannot write index entry for {} v{}",
303                        normalized_id, version
304                    );
305                    // Clean up temp file
306                    let _ = std::fs::remove_file(&temp_path);
307                    return Err(ServiceError::Custom(format!(
308                        "Filesystem full: cannot update index for {} v{}. Existing index preserved.",
309                        normalized_id, version
310                    )));
311                }
312                return Err(ServiceError::Io(e));
313            }
314        }
315
316        if let Err(e) = temp_file.sync_all() {
317            // Check if error is due to filesystem being full
318            let error_msg = e.to_string().to_lowercase();
319            if error_msg.contains("no space")
320                || error_msg.contains("filesystem full")
321                || e.raw_os_error() == Some(28)
322            {
323                warn!(
324                    "Filesystem full: cannot sync index file for {} v{}",
325                    normalized_id, version
326                );
327                // Clean up temp file
328                let _ = std::fs::remove_file(&temp_path);
329                return Err(ServiceError::Custom(format!(
330                    "Filesystem full: cannot update index for {} v{}. Existing index preserved.",
331                    normalized_id, version
332                )));
333            }
334            return Err(ServiceError::Io(e));
335        }
336        drop(temp_file);
337
338        // Step 8: Atomically rename temporary file to target
339        std::fs::rename(&temp_path, &index_path).map_err(|e| {
340            warn!(
341                "Failed to atomically rename temp file {:?} to {:?}: {}",
342                temp_path, index_path, e
343            );
344            // If rename fails, try to clean up temp file
345            let _ = std::fs::remove_file(&temp_path);
346            ServiceError::Io(e)
347        })?;
348
349        info!(
350            "Successfully updated index for {} v{} (total {} entries)",
351            normalized_id,
352            version,
353            existing_entries.len()
354        );
355
356        Ok(())
357    }
358
359    /// Read version entries from an index file path
360    /// Helper function to read directly from a file path (not using registry_path + skill_id)
361    fn read_entries_from_path(index_path: &PathBuf) -> Result<Vec<VersionEntry>, ServiceError> {
362        use std::fs;
363
364        if !index_path.exists() {
365            return Ok(Vec::new());
366        }
367
368        let content = fs::read_to_string(index_path).map_err(ServiceError::Io)?;
369        let mut entries = Vec::new();
370
371        // Parse line-by-line (newline-delimited JSON)
372        for line in content.lines() {
373            let line = line.trim();
374            if line.is_empty() {
375                continue;
376            }
377
378            match serde_json::from_str::<VersionEntry>(line) {
379                Ok(entry) => entries.push(entry),
380                Err(e) => {
381                    // Log error but continue parsing other lines
382                    warn!("Failed to parse index entry: {} (line: {})", e, line);
383                }
384            }
385        }
386
387        Ok(entries)
388    }
389}