fastskill_core/core/registry/
index_manager.rs1use crate::core::registry_index::{get_skill_index_path, ScopedSkillName, VersionEntry};
4use crate::core::service::ServiceError;
5use fs2::FileExt;
6use std::fs::{File, OpenOptions};
7use std::io::Write;
8use std::path::PathBuf;
9use std::sync::Mutex;
10use std::time::{Duration, Instant, SystemTime};
11use tracing::{info, warn};
12
13pub struct IndexManager {
15 registry_path: PathBuf,
17 lock_timeout: Duration,
19 file_metadata: Mutex<std::collections::HashMap<PathBuf, IndexFileMetadata>>,
21}
22
23#[derive(Debug, Clone)]
25struct IndexFileMetadata {
26 mtime: SystemTime,
28 size: u64,
30}
31
32impl IndexManager {
33 pub fn new(registry_path: PathBuf) -> Self {
41 Self {
42 registry_path,
43 lock_timeout: Duration::from_secs(30),
44 file_metadata: Mutex::new(std::collections::HashMap::new()),
45 }
46 }
47
48 pub fn atomic_update(
73 &self,
74 skill_id: &str,
75 version: &str,
76 entry: &VersionEntry,
77 ) -> Result<(), ServiceError> {
78 let normalized_id = ScopedSkillName::normalize(skill_id);
80 info!("Normalized skill_id '{}' to '{}'", skill_id, normalized_id);
81
82 let index_path = get_skill_index_path(&self.registry_path, &normalized_id)?;
84
85 if let Some(parent) = index_path.parent() {
87 std::fs::create_dir_all(parent).map_err(ServiceError::Io)?;
88 }
89
90 let existing_entries_before_lock = if index_path.exists() {
93 Self::read_entries_from_path(&index_path)?
94 } else {
95 Vec::new()
96 };
97
98 for existing_entry in &existing_entries_before_lock {
100 if existing_entry.vers == version {
101 return Err(ServiceError::Custom(format!(
102 "Version {} already exists for skill {}",
103 version, normalized_id
104 )));
105 }
106 }
107
108 let lock_start = Instant::now();
110 let file = loop {
111 let file = OpenOptions::new()
112 .read(true)
113 .write(true)
114 .create(true)
115 .truncate(true)
116 .open(&index_path)
117 .map_err(ServiceError::Io)?;
118
119 match file.try_lock_exclusive() {
121 Ok(()) => {
122 let elapsed = lock_start.elapsed();
123 if elapsed.as_millis() > 0 {
124 info!(
125 "Acquired lock on index file: {:?} (waited {}ms)",
126 index_path,
127 elapsed.as_millis()
128 );
129 } else {
130 info!("Acquired lock on index file: {:?}", index_path);
131 }
132 break file;
133 }
134 Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
135 let elapsed = lock_start.elapsed();
137 if elapsed >= self.lock_timeout {
138 warn!(
139 "Lock timeout exceeded for {:?} after {} seconds",
140 index_path,
141 self.lock_timeout.as_secs()
142 );
143 return Err(ServiceError::Custom(format!(
144 "Timeout waiting for file lock on {:?} (exceeded {} seconds)",
145 index_path,
146 self.lock_timeout.as_secs()
147 )));
148 }
149 if elapsed.as_secs() > 0 && elapsed.as_secs().is_multiple_of(5) {
151 info!(
152 "Waiting for lock on {:?} ({}s elapsed, timeout: {}s)",
153 index_path,
154 elapsed.as_secs(),
155 self.lock_timeout.as_secs()
156 );
157 }
158 std::thread::sleep(Duration::from_millis(100));
160 continue;
161 }
162 Err(e) => {
163 warn!("Failed to acquire lock on {:?}: {}", index_path, e);
164 return Err(ServiceError::Io(e));
165 }
166 }
167 };
168
169 struct LockGuard(File);
172 impl Drop for LockGuard {
173 fn drop(&mut self) {
174 if let Err(e) = self.0.unlock() {
175 warn!("Failed to release file lock: {}", e);
176 }
177 }
178 }
179 let _lock_guard = LockGuard(file);
180
181 if index_path.exists() {
183 let file_metadata = self.file_metadata.lock().map_err(|_| {
184 ServiceError::Custom(
185 "Mutex poisoned - another thread panicked while holding the lock".to_string(),
186 )
187 })?;
188 if let Some(prev_metadata) = file_metadata.get(&index_path) {
189 match std::fs::metadata(&index_path) {
190 Ok(current_metadata) => {
191 let current_mtime = current_metadata
192 .modified()
193 .unwrap_or(SystemTime::UNIX_EPOCH);
194 let current_size = current_metadata.len();
195
196 if current_mtime != prev_metadata.mtime
197 || current_size != prev_metadata.size
198 {
199 warn!(
200 "External modification detected for {:?}: mtime changed from {:?} to {:?}, size changed from {} to {}",
201 index_path, prev_metadata.mtime, current_mtime, prev_metadata.size, current_size
202 );
203 }
204 }
205 Err(e) => {
206 warn!("Failed to read metadata for {:?}: {}", index_path, e);
207 }
208 }
209 }
210 drop(file_metadata); }
212
213 let mut existing_entries = if index_path.exists() {
215 Self::read_entries_from_path(&index_path)?
216 } else {
217 Vec::new()
218 };
219
220 if index_path.exists() {
222 if let Ok(metadata) = std::fs::metadata(&index_path) {
223 if let Ok(mtime) = metadata.modified() {
224 let mut file_metadata = self.file_metadata.lock().map_err(|_| {
225 ServiceError::Custom(
226 "Mutex poisoned - another thread panicked while holding the lock"
227 .to_string(),
228 )
229 })?;
230 file_metadata.insert(
231 index_path.clone(),
232 IndexFileMetadata {
233 mtime,
234 size: metadata.len(),
235 },
236 );
237 }
238 }
239 }
240
241 for existing_entry in &existing_entries {
243 if existing_entry.vers == version {
244 warn!(
245 "Duplicate version {} detected for skill {} after acquiring lock",
246 version, normalized_id
247 );
248 return Err(ServiceError::Custom(format!(
249 "Version {} already exists for skill {} (detected after lock)",
250 version, normalized_id
251 )));
252 }
253 }
254
255 info!(
256 "Updating index for {} v{} ({} existing entries)",
257 normalized_id,
258 version,
259 existing_entries.len()
260 );
261
262 existing_entries.push(entry.clone());
264
265 let temp_path = index_path.with_extension("tmp");
267 let mut temp_file = match File::create(&temp_path) {
268 Ok(file) => file,
269 Err(e) => {
270 let error_msg = e.to_string().to_lowercase();
272 if error_msg.contains("no space")
273 || error_msg.contains("filesystem full")
274 || e.raw_os_error() == Some(28)
275 {
276 warn!(
277 "Filesystem full: cannot write index file for {} v{}",
278 normalized_id, version
279 );
280 return Err(ServiceError::Custom(format!(
281 "Filesystem full: cannot update index for {} v{}. Existing index preserved.",
282 normalized_id, version
283 )));
284 }
285 return Err(ServiceError::Io(e));
286 }
287 };
288
289 for entry in &existing_entries {
291 let line = serde_json::to_string(entry).map_err(|e| {
292 ServiceError::Custom(format!("Failed to serialize index entry: {}", e))
293 })?;
294 if let Err(e) = writeln!(temp_file, "{}", line) {
295 let error_msg = e.to_string().to_lowercase();
297 if error_msg.contains("no space")
298 || error_msg.contains("filesystem full")
299 || e.raw_os_error() == Some(28)
300 {
301 warn!(
302 "Filesystem full: cannot write index entry for {} v{}",
303 normalized_id, version
304 );
305 let _ = std::fs::remove_file(&temp_path);
307 return Err(ServiceError::Custom(format!(
308 "Filesystem full: cannot update index for {} v{}. Existing index preserved.",
309 normalized_id, version
310 )));
311 }
312 return Err(ServiceError::Io(e));
313 }
314 }
315
316 if let Err(e) = temp_file.sync_all() {
317 let error_msg = e.to_string().to_lowercase();
319 if error_msg.contains("no space")
320 || error_msg.contains("filesystem full")
321 || e.raw_os_error() == Some(28)
322 {
323 warn!(
324 "Filesystem full: cannot sync index file for {} v{}",
325 normalized_id, version
326 );
327 let _ = std::fs::remove_file(&temp_path);
329 return Err(ServiceError::Custom(format!(
330 "Filesystem full: cannot update index for {} v{}. Existing index preserved.",
331 normalized_id, version
332 )));
333 }
334 return Err(ServiceError::Io(e));
335 }
336 drop(temp_file);
337
338 std::fs::rename(&temp_path, &index_path).map_err(|e| {
340 warn!(
341 "Failed to atomically rename temp file {:?} to {:?}: {}",
342 temp_path, index_path, e
343 );
344 let _ = std::fs::remove_file(&temp_path);
346 ServiceError::Io(e)
347 })?;
348
349 info!(
350 "Successfully updated index for {} v{} (total {} entries)",
351 normalized_id,
352 version,
353 existing_entries.len()
354 );
355
356 Ok(())
357 }
358
359 fn read_entries_from_path(index_path: &PathBuf) -> Result<Vec<VersionEntry>, ServiceError> {
362 use std::fs;
363
364 if !index_path.exists() {
365 return Ok(Vec::new());
366 }
367
368 let content = fs::read_to_string(index_path).map_err(ServiceError::Io)?;
369 let mut entries = Vec::new();
370
371 for line in content.lines() {
373 let line = line.trim();
374 if line.is_empty() {
375 continue;
376 }
377
378 match serde_json::from_str::<VersionEntry>(line) {
379 Ok(entry) => entries.push(entry),
380 Err(e) => {
381 warn!("Failed to parse index entry: {} (line: {})", e, line);
383 }
384 }
385 }
386
387 Ok(entries)
388 }
389}