1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
//! Atomic index update manager with file locking
use crate::core::registry_index::{get_skill_index_path, ScopedSkillName, VersionEntry};
use crate::core::service::ServiceError;
use fs2::FileExt;
use std::fs::{File, OpenOptions};
use std::io::Write;
use std::path::PathBuf;
use std::sync::Mutex;
use std::time::{Duration, Instant, SystemTime};
use tracing::{info, warn};
/// Index manager for atomic updates with file locking
pub struct IndexManager {
/// Base path to the registry index directory
registry_path: PathBuf,
/// Maximum time to wait for file lock acquisition (default: 30 seconds)
lock_timeout: Duration,
/// Tracked file metadata for external modification detection (interior mutability)
file_metadata: Mutex<std::collections::HashMap<PathBuf, IndexFileMetadata>>,
}
/// Metadata for tracking index file state (for external modification detection)
#[derive(Debug, Clone)]
struct IndexFileMetadata {
/// File modification time
mtime: SystemTime,
/// File size in bytes
size: u64,
}
impl IndexManager {
/// Create a new IndexManager instance
///
/// # Arguments
/// * `registry_path` - Base path to the registry index directory
///
/// # Returns
/// Configured IndexManager instance with default 30-second lock timeout
pub fn new(registry_path: PathBuf) -> Self {
Self {
registry_path,
lock_timeout: Duration::from_secs(30),
file_metadata: Mutex::new(std::collections::HashMap::new()),
}
}
/// Atomically update the index file for a skill version
///
/// This method:
/// 1. Normalizes the skill_id using ScopedSkillName::normalize()
/// 2. Checks for duplicate versions
/// 3. Acquires an exclusive file lock with timeout
/// 4. Reads existing entries
/// 5. Appends new entry
/// 6. Writes to temporary file
/// 7. Atomically renames temporary file to target
/// 8. Releases lock
///
/// # Arguments
/// * `skill_id` - The skill identifier (may be scoped, e.g., `@org/package`)
/// * `version` - The version string (e.g., `1.0.0`)
/// * `entry` - The version entry to add to the index
///
/// # Returns
/// `Ok(())` if successful, `Err(ServiceError)` if operation fails
///
/// # Errors
/// - Returns `ServiceError::Custom` if duplicate version is detected
/// - Returns `ServiceError::Custom` if lock timeout is exceeded
/// - Returns `ServiceError::Io` for filesystem errors
pub fn atomic_update(
&self,
skill_id: &str,
version: &str,
entry: &VersionEntry,
) -> Result<(), ServiceError> {
// Step 1: Normalize scoped name
let normalized_id = ScopedSkillName::normalize(skill_id);
info!("Normalized skill_id '{}' to '{}'", skill_id, normalized_id);
// Step 2: Get index file path
let index_path = get_skill_index_path(&self.registry_path, &normalized_id)?;
// Ensure parent directory exists
if let Some(parent) = index_path.parent() {
std::fs::create_dir_all(parent).map_err(ServiceError::Io)?;
}
// Step 3: Check for duplicate version (before acquiring lock)
// Read existing entries if file exists
let existing_entries_before_lock = if index_path.exists() {
Self::read_entries_from_path(&index_path)?
} else {
Vec::new()
};
// Check if version already exists
for existing_entry in &existing_entries_before_lock {
if existing_entry.vers == version {
return Err(ServiceError::Custom(format!(
"Version {} already exists for skill {}",
version, normalized_id
)));
}
}
// Step 4: Acquire exclusive file lock with timeout
let lock_start = Instant::now();
let file = loop {
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&index_path)
.map_err(ServiceError::Io)?;
// Try to acquire exclusive lock
match file.try_lock_exclusive() {
Ok(()) => {
let elapsed = lock_start.elapsed();
if elapsed.as_millis() > 0 {
info!(
"Acquired lock on index file: {:?} (waited {}ms)",
index_path,
elapsed.as_millis()
);
} else {
info!("Acquired lock on index file: {:?}", index_path);
}
break file;
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
// Lock is held by another process, wait and retry
let elapsed = lock_start.elapsed();
if elapsed >= self.lock_timeout {
warn!(
"Lock timeout exceeded for {:?} after {} seconds",
index_path,
self.lock_timeout.as_secs()
);
return Err(ServiceError::Custom(format!(
"Timeout waiting for file lock on {:?} (exceeded {} seconds)",
index_path,
self.lock_timeout.as_secs()
)));
}
// Log if we've been waiting a while
if elapsed.as_secs() > 0 && elapsed.as_secs().is_multiple_of(5) {
info!(
"Waiting for lock on {:?} ({}s elapsed, timeout: {}s)",
index_path,
elapsed.as_secs(),
self.lock_timeout.as_secs()
);
}
// Wait a bit before retrying
std::thread::sleep(Duration::from_millis(100));
continue;
}
Err(e) => {
warn!("Failed to acquire lock on {:?}: {}", index_path, e);
return Err(ServiceError::Io(e));
}
}
};
// Lock will be released when file is dropped
// Use a guard to ensure lock is released even on error
struct LockGuard(File);
impl Drop for LockGuard {
fn drop(&mut self) {
if let Err(e) = self.0.unlock() {
warn!("Failed to release file lock: {}", e);
}
}
}
let _lock_guard = LockGuard(file);
// Step 5: Check for external modifications (before reading)
if index_path.exists() {
let file_metadata = self.file_metadata.lock().map_err(|_| {
ServiceError::Custom(
"Mutex poisoned - another thread panicked while holding the lock".to_string(),
)
})?;
if let Some(prev_metadata) = file_metadata.get(&index_path) {
match std::fs::metadata(&index_path) {
Ok(current_metadata) => {
let current_mtime = current_metadata
.modified()
.unwrap_or(SystemTime::UNIX_EPOCH);
let current_size = current_metadata.len();
if current_mtime != prev_metadata.mtime
|| current_size != prev_metadata.size
{
warn!(
"External modification detected for {:?}: mtime changed from {:?} to {:?}, size changed from {} to {}",
index_path, prev_metadata.mtime, current_mtime, prev_metadata.size, current_size
);
}
}
Err(e) => {
warn!("Failed to read metadata for {:?}: {}", index_path, e);
}
}
}
drop(file_metadata); // Release lock before reading
}
// Step 5: Read existing entries again (after acquiring lock)
let mut existing_entries = if index_path.exists() {
Self::read_entries_from_path(&index_path)?
} else {
Vec::new()
};
// Update tracked metadata after successful read
if index_path.exists() {
if let Ok(metadata) = std::fs::metadata(&index_path) {
if let Ok(mtime) = metadata.modified() {
let mut file_metadata = self.file_metadata.lock().map_err(|_| {
ServiceError::Custom(
"Mutex poisoned - another thread panicked while holding the lock"
.to_string(),
)
})?;
file_metadata.insert(
index_path.clone(),
IndexFileMetadata {
mtime,
size: metadata.len(),
},
);
}
}
}
// Double-check for duplicate (in case it was added between initial check and lock)
for existing_entry in &existing_entries {
if existing_entry.vers == version {
warn!(
"Duplicate version {} detected for skill {} after acquiring lock",
version, normalized_id
);
return Err(ServiceError::Custom(format!(
"Version {} already exists for skill {} (detected after lock)",
version, normalized_id
)));
}
}
info!(
"Updating index for {} v{} ({} existing entries)",
normalized_id,
version,
existing_entries.len()
);
// Step 6: Append new entry
existing_entries.push(entry.clone());
// Step 7: Write to temporary file
let temp_path = index_path.with_extension("tmp");
let mut temp_file = match File::create(&temp_path) {
Ok(file) => file,
Err(e) => {
// Check if error is due to filesystem being full
let error_msg = e.to_string().to_lowercase();
if error_msg.contains("no space")
|| error_msg.contains("filesystem full")
|| e.raw_os_error() == Some(28)
{
warn!(
"Filesystem full: cannot write index file for {} v{}",
normalized_id, version
);
return Err(ServiceError::Custom(format!(
"Filesystem full: cannot update index for {} v{}. Existing index preserved.",
normalized_id, version
)));
}
return Err(ServiceError::Io(e));
}
};
// Write all entries as newline-delimited JSON
for entry in &existing_entries {
let line = serde_json::to_string(entry).map_err(|e| {
ServiceError::Custom(format!("Failed to serialize index entry: {}", e))
})?;
if let Err(e) = writeln!(temp_file, "{}", line) {
// Check if error is due to filesystem being full
let error_msg = e.to_string().to_lowercase();
if error_msg.contains("no space")
|| error_msg.contains("filesystem full")
|| e.raw_os_error() == Some(28)
{
warn!(
"Filesystem full: cannot write index entry for {} v{}",
normalized_id, version
);
// Clean up temp file
let _ = std::fs::remove_file(&temp_path);
return Err(ServiceError::Custom(format!(
"Filesystem full: cannot update index for {} v{}. Existing index preserved.",
normalized_id, version
)));
}
return Err(ServiceError::Io(e));
}
}
if let Err(e) = temp_file.sync_all() {
// Check if error is due to filesystem being full
let error_msg = e.to_string().to_lowercase();
if error_msg.contains("no space")
|| error_msg.contains("filesystem full")
|| e.raw_os_error() == Some(28)
{
warn!(
"Filesystem full: cannot sync index file for {} v{}",
normalized_id, version
);
// Clean up temp file
let _ = std::fs::remove_file(&temp_path);
return Err(ServiceError::Custom(format!(
"Filesystem full: cannot update index for {} v{}. Existing index preserved.",
normalized_id, version
)));
}
return Err(ServiceError::Io(e));
}
drop(temp_file);
// Step 8: Atomically rename temporary file to target
std::fs::rename(&temp_path, &index_path).map_err(|e| {
warn!(
"Failed to atomically rename temp file {:?} to {:?}: {}",
temp_path, index_path, e
);
// If rename fails, try to clean up temp file
let _ = std::fs::remove_file(&temp_path);
ServiceError::Io(e)
})?;
info!(
"Successfully updated index for {} v{} (total {} entries)",
normalized_id,
version,
existing_entries.len()
);
Ok(())
}
/// Read version entries from an index file path
/// Helper function to read directly from a file path (not using registry_path + skill_id)
fn read_entries_from_path(index_path: &PathBuf) -> Result<Vec<VersionEntry>, ServiceError> {
use std::fs;
if !index_path.exists() {
return Ok(Vec::new());
}
let content = fs::read_to_string(index_path).map_err(ServiceError::Io)?;
let mut entries = Vec::new();
// Parse line-by-line (newline-delimited JSON)
for line in content.lines() {
let line = line.trim();
if line.is_empty() {
continue;
}
match serde_json::from_str::<VersionEntry>(line) {
Ok(entry) => entries.push(entry),
Err(e) => {
// Log error but continue parsing other lines
warn!("Failed to parse index entry: {} (line: {})", e, line);
}
}
}
Ok(entries)
}
}