Skip to main content

gity_storage/
lib.rs

1use gity_ipc::RepoStatus;
2use serde::{Deserialize, Serialize};
3use std::{
4    collections::HashMap,
5    fs,
6    path::{Path, PathBuf},
7    sync::RwLock,
8    time::{Duration, SystemTime, UNIX_EPOCH},
9};
10use thiserror::Error;
11use tracing::warn;
12
13/// Maximum number of dirty paths to track per repository.
14/// When this limit is exceeded, the entire repo is marked as dirty (`.`)
15/// to avoid unbounded memory growth.
16const MAX_DIRTY_PATHS: usize = 10_000;
17
18pub type StorageResult<T> = Result<T, StorageError>;
19
20/// Errors surfaced by storage implementations.
21#[derive(Debug, Error, PartialEq, Eq)]
22pub enum StorageError {
23    #[error("repository not registered: {0}")]
24    NotFound(String),
25    #[error("internal locking error")]
26    Poisoned,
27    #[error("storage backend error: {0}")]
28    Backend(String),
29}
30
31/// Metadata tracked for every registered repository.
32#[derive(Debug, Clone, PartialEq, Eq)]
33pub struct RepoMetadata {
34    pub repo_path: PathBuf,
35    pub registered_at: SystemTime,
36    pub last_event: Option<SystemTime>,
37    pub status: RepoStatus,
38    pub pending_jobs: usize,
39    pub dirty_paths: Vec<PathBuf>,
40    pub generation: u64,
41    pub needs_reconciliation: Option<bool>,
42    pub last_watcher_token: Option<u64>,
43}
44
45impl RepoMetadata {
46    pub fn new(repo_path: PathBuf) -> Self {
47        Self {
48            repo_path,
49            registered_at: SystemTime::now(),
50            last_event: None,
51            status: RepoStatus::Idle,
52            pending_jobs: 0,
53            dirty_paths: Vec::new(),
54            generation: 0,
55            needs_reconciliation: Some(false),
56            last_watcher_token: None,
57        }
58    }
59}
60
61/// Primary abstraction for storing repository metadata.
62pub trait MetadataStore: Send + Sync + 'static {
63    fn register_repo(&self, repo_path: PathBuf) -> StorageResult<RepoMetadata>;
64    fn unregister_repo(&self, repo_path: &Path) -> StorageResult<Option<RepoMetadata>>;
65    fn get_repo(&self, repo_path: &Path) -> StorageResult<Option<RepoMetadata>>;
66    fn list_repos(&self) -> StorageResult<Vec<RepoMetadata>>;
67    fn update_repo_status(&self, repo_path: &Path, status: RepoStatus) -> StorageResult<()>;
68    fn increment_jobs(&self, repo_path: &Path, delta: isize) -> StorageResult<RepoMetadata>;
69    fn record_event(&self, repo_path: &Path, when: SystemTime) -> StorageResult<()>;
70    fn mark_dirty_path(&self, repo_path: &Path, path: PathBuf) -> StorageResult<()>;
71    fn drain_dirty_paths(&self, repo_path: &Path) -> StorageResult<Vec<PathBuf>>;
72    fn dirty_path_count(&self, repo_path: &Path) -> StorageResult<usize>;
73    fn current_generation(&self, repo_path: &Path) -> StorageResult<u64>;
74    fn bump_generation(&self, repo_path: &Path) -> StorageResult<u64>;
75    fn set_needs_reconciliation(&self, repo_path: &Path, needs: bool) -> StorageResult<()>;
76    fn set_watcher_token(&self, repo_path: &Path, token: u64) -> StorageResult<()>;
77}
78
79/// In-memory `MetadataStore` used in tests and the current bootstrap binary.
80pub struct InMemoryMetadataStore {
81    inner: RwLock<HashMap<PathBuf, RepoMetadata>>,
82}
83
84impl InMemoryMetadataStore {
85    pub fn new() -> Self {
86        Self {
87            inner: RwLock::new(HashMap::new()),
88        }
89    }
90}
91
92impl Default for InMemoryMetadataStore {
93    fn default() -> Self {
94        Self::new()
95    }
96}
97
98impl MetadataStore for InMemoryMetadataStore {
99    fn register_repo(&self, repo_path: PathBuf) -> StorageResult<RepoMetadata> {
100        let mut guard = self.inner.write().map_err(|_| StorageError::Poisoned)?;
101        let entry = guard
102            .entry(repo_path.clone())
103            .or_insert_with(|| RepoMetadata::new(repo_path));
104        Ok(entry.clone())
105    }
106
107    fn unregister_repo(&self, repo_path: &Path) -> StorageResult<Option<RepoMetadata>> {
108        let mut guard = self.inner.write().map_err(|_| StorageError::Poisoned)?;
109        Ok(guard.remove(repo_path))
110    }
111
112    fn get_repo(&self, repo_path: &Path) -> StorageResult<Option<RepoMetadata>> {
113        let guard = self.inner.read().map_err(|_| StorageError::Poisoned)?;
114        Ok(guard.get(repo_path).cloned())
115    }
116
117    fn list_repos(&self) -> StorageResult<Vec<RepoMetadata>> {
118        let guard = self.inner.read().map_err(|_| StorageError::Poisoned)?;
119        Ok(guard.values().cloned().collect())
120    }
121
122    fn update_repo_status(&self, repo_path: &Path, status: RepoStatus) -> StorageResult<()> {
123        let mut guard = self.inner.write().map_err(|_| StorageError::Poisoned)?;
124        let entry = guard
125            .get_mut(repo_path)
126            .ok_or_else(|| StorageError::NotFound(repo_path.display().to_string()))?;
127        entry.status = status;
128        Ok(())
129    }
130
131    fn increment_jobs(&self, repo_path: &Path, delta: isize) -> StorageResult<RepoMetadata> {
132        let mut guard = self.inner.write().map_err(|_| StorageError::Poisoned)?;
133        let entry = guard
134            .get_mut(repo_path)
135            .ok_or_else(|| StorageError::NotFound(repo_path.display().to_string()))?;
136        if delta >= 0 {
137            entry.pending_jobs = entry.pending_jobs.saturating_add(delta as usize);
138        } else {
139            entry.pending_jobs = entry.pending_jobs.saturating_sub(delta.unsigned_abs());
140        }
141        entry.status = job_status_for(entry.pending_jobs);
142        Ok(entry.clone())
143    }
144
145    fn record_event(&self, repo_path: &Path, when: SystemTime) -> StorageResult<()> {
146        let mut guard = self.inner.write().map_err(|_| StorageError::Poisoned)?;
147        let entry = guard
148            .get_mut(repo_path)
149            .ok_or_else(|| StorageError::NotFound(repo_path.display().to_string()))?;
150        entry.last_event = Some(when);
151        Ok(())
152    }
153
154    fn current_generation(&self, repo_path: &Path) -> StorageResult<u64> {
155        let guard = self.inner.read().map_err(|_| StorageError::Poisoned)?;
156        let entry = guard
157            .get(repo_path)
158            .ok_or_else(|| StorageError::NotFound(repo_path.display().to_string()))?;
159        Ok(entry.generation)
160    }
161
162    fn bump_generation(&self, repo_path: &Path) -> StorageResult<u64> {
163        let mut guard = self.inner.write().map_err(|_| StorageError::Poisoned)?;
164        let entry = guard
165            .get_mut(repo_path)
166            .ok_or_else(|| StorageError::NotFound(repo_path.display().to_string()))?;
167        entry.generation = entry.generation.saturating_add(1);
168        Ok(entry.generation)
169    }
170
171    fn mark_dirty_path(&self, repo_path: &Path, path: PathBuf) -> StorageResult<()> {
172        let mut guard = self.inner.write().map_err(|_| StorageError::Poisoned)?;
173        let entry = guard
174            .get_mut(repo_path)
175            .ok_or_else(|| StorageError::NotFound(repo_path.display().to_string()))?;
176
177        // Check if we've exceeded the dirty_paths limit
178        if entry.dirty_paths.len() >= MAX_DIRTY_PATHS {
179            // Already at limit - if first entry isn't ".", replace with "." to mark entire repo dirty
180            if entry.dirty_paths.first() != Some(&PathBuf::from(".")) {
181                warn!(
182                    repo = %repo_path.display(),
183                    "dirty_paths limit ({}) exceeded, marking entire repo dirty",
184                    MAX_DIRTY_PATHS
185                );
186                entry.dirty_paths.clear();
187                entry.dirty_paths.push(PathBuf::from("."));
188            }
189            return Ok(());
190        }
191
192        if !entry.dirty_paths.contains(&path) {
193            entry.dirty_paths.push(path);
194        }
195        Ok(())
196    }
197
198    fn drain_dirty_paths(&self, repo_path: &Path) -> StorageResult<Vec<PathBuf>> {
199        let mut guard = self.inner.write().map_err(|_| StorageError::Poisoned)?;
200        let entry = guard
201            .get_mut(repo_path)
202            .ok_or_else(|| StorageError::NotFound(repo_path.display().to_string()))?;
203        Ok(std::mem::take(&mut entry.dirty_paths))
204    }
205
206    fn dirty_path_count(&self, repo_path: &Path) -> StorageResult<usize> {
207        let guard = self.inner.read().map_err(|_| StorageError::Poisoned)?;
208        let entry = guard
209            .get(repo_path)
210            .ok_or_else(|| StorageError::NotFound(repo_path.display().to_string()))?;
211        Ok(entry.dirty_paths.len())
212    }
213
214    fn set_needs_reconciliation(&self, repo_path: &Path, needs: bool) -> StorageResult<()> {
215        let mut guard = self.inner.write().map_err(|_| StorageError::Poisoned)?;
216        let entry = guard
217            .get_mut(repo_path)
218            .ok_or_else(|| StorageError::NotFound(repo_path.display().to_string()))?;
219        entry.needs_reconciliation = Some(needs);
220        Ok(())
221    }
222
223    fn set_watcher_token(&self, repo_path: &Path, token: u64) -> StorageResult<()> {
224        let mut guard = self.inner.write().map_err(|_| StorageError::Poisoned)?;
225        let entry = guard
226            .get_mut(repo_path)
227            .ok_or_else(|| StorageError::NotFound(repo_path.display().to_string()))?;
228        entry.last_watcher_token = Some(token);
229        Ok(())
230    }
231}
232
233/// Persistent sled-backed metadata store.
234#[derive(Clone)]
235pub struct SledMetadataStore {
236    tree: sled::Tree,
237}
238
239impl SledMetadataStore {
240    pub fn open(path: impl AsRef<Path>) -> StorageResult<Self> {
241        let db = sled::open(path).map_err(map_sled_err)?;
242        let tree = db.open_tree("repos").map_err(map_sled_err)?;
243        Ok(Self { tree })
244    }
245
246    fn load_repo(&self, repo_path: &Path) -> StorageResult<RepoMetadata> {
247        let key = repo_key(repo_path);
248        let Some(bytes) = self.tree.get(&key).map_err(map_sled_err)? else {
249            return Err(StorageError::NotFound(repo_path.display().to_string()));
250        };
251        deserialize_record(bytes.as_ref())
252    }
253
254    fn write_repo(&self, metadata: &RepoMetadata) -> StorageResult<()> {
255        let key = repo_key(&metadata.repo_path);
256        let record = RepoRecord::from(metadata);
257        let bytes =
258            bincode::serialize(&record).map_err(|err| StorageError::Backend(err.to_string()))?;
259        self.tree.insert(key, bytes).map_err(map_sled_err)?;
260        Ok(())
261    }
262
263    fn update_repo<F>(&self, repo_path: &Path, mutator: F) -> StorageResult<RepoMetadata>
264    where
265        F: FnOnce(&mut RepoMetadata),
266    {
267        let mut current = self.load_repo(repo_path)?;
268        mutator(&mut current);
269        self.write_repo(&current)?;
270        Ok(current)
271    }
272}
273
274impl MetadataStore for SledMetadataStore {
275    fn register_repo(&self, repo_path: PathBuf) -> StorageResult<RepoMetadata> {
276        let key = repo_key(&repo_path);
277        if let Some(existing) = self.tree.get(&key).map_err(map_sled_err)? {
278            return deserialize_record(existing.as_ref());
279        }
280        let metadata = RepoMetadata::new(repo_path);
281        self.write_repo(&metadata)?;
282        Ok(metadata)
283    }
284
285    fn unregister_repo(&self, repo_path: &Path) -> StorageResult<Option<RepoMetadata>> {
286        let key = repo_key(repo_path);
287        let result = self.tree.remove(&key).map_err(map_sled_err)?;
288        Ok(match result {
289            Some(bytes) => Some(deserialize_record(bytes.as_ref())?),
290            None => None,
291        })
292    }
293
294    fn get_repo(&self, repo_path: &Path) -> StorageResult<Option<RepoMetadata>> {
295        let key = repo_key(repo_path);
296        match self.tree.get(&key).map_err(map_sled_err)? {
297            Some(bytes) => Ok(Some(deserialize_record(bytes.as_ref())?)),
298            None => Ok(None),
299        }
300    }
301
302    fn list_repos(&self) -> StorageResult<Vec<RepoMetadata>> {
303        let mut repos = Vec::new();
304        for entry in self.tree.iter() {
305            let (_, value) = entry.map_err(map_sled_err)?;
306            repos.push(deserialize_record(value.as_ref())?);
307        }
308        Ok(repos)
309    }
310
311    fn update_repo_status(&self, repo_path: &Path, status: RepoStatus) -> StorageResult<()> {
312        self.update_repo(repo_path, |meta| meta.status = status)?;
313        Ok(())
314    }
315
316    fn increment_jobs(&self, repo_path: &Path, delta: isize) -> StorageResult<RepoMetadata> {
317        self.update_repo(repo_path, |meta| {
318            if delta >= 0 {
319                meta.pending_jobs = meta.pending_jobs.saturating_add(delta as usize);
320            } else {
321                meta.pending_jobs = meta.pending_jobs.saturating_sub(delta.unsigned_abs());
322            }
323            meta.status = job_status_for(meta.pending_jobs);
324        })
325    }
326
327    fn record_event(&self, repo_path: &Path, when: SystemTime) -> StorageResult<()> {
328        self.update_repo(repo_path, |meta| meta.last_event = Some(when))?;
329        Ok(())
330    }
331
332    fn mark_dirty_path(&self, repo_path: &Path, path: PathBuf) -> StorageResult<()> {
333        self.update_repo(repo_path, |meta| {
334            // Check if we've exceeded the dirty_paths limit
335            if meta.dirty_paths.len() >= MAX_DIRTY_PATHS {
336                // Already at limit - if first entry isn't ".", replace with "." to mark entire repo dirty
337                if meta.dirty_paths.first() != Some(&PathBuf::from(".")) {
338                    warn!(
339                        repo = %repo_path.display(),
340                        "dirty_paths limit ({}) exceeded, marking entire repo dirty",
341                        MAX_DIRTY_PATHS
342                    );
343                    meta.dirty_paths.clear();
344                    meta.dirty_paths.push(PathBuf::from("."));
345                }
346                return;
347            }
348
349            if !meta.dirty_paths.contains(&path) {
350                meta.dirty_paths.push(path);
351            }
352        })?;
353        Ok(())
354    }
355
356    fn drain_dirty_paths(&self, repo_path: &Path) -> StorageResult<Vec<PathBuf>> {
357        let mut removed = Vec::new();
358        self.update_repo(repo_path, |meta| {
359            removed = std::mem::take(&mut meta.dirty_paths);
360        })?;
361        Ok(removed)
362    }
363
364    fn current_generation(&self, repo_path: &Path) -> StorageResult<u64> {
365        self.load_repo(repo_path).map(|meta| meta.generation)
366    }
367
368    fn bump_generation(&self, repo_path: &Path) -> StorageResult<u64> {
369        let mut generation = 0;
370        self.update_repo(repo_path, |meta| {
371            meta.generation = meta.generation.saturating_add(1);
372            generation = meta.generation;
373        })?;
374        Ok(generation)
375    }
376
377    fn dirty_path_count(&self, repo_path: &Path) -> StorageResult<usize> {
378        self.load_repo(repo_path).map(|meta| meta.dirty_paths.len())
379    }
380
381    fn set_needs_reconciliation(&self, repo_path: &Path, needs: bool) -> StorageResult<()> {
382        self.update_repo(repo_path, |meta| {
383            meta.needs_reconciliation = Some(needs);
384        })?;
385        Ok(())
386    }
387
388    fn set_watcher_token(&self, repo_path: &Path, token: u64) -> StorageResult<()> {
389        self.update_repo(repo_path, |meta| {
390            meta.last_watcher_token = Some(token);
391        })?;
392        Ok(())
393    }
394}
395
396fn repo_key(path: &Path) -> Vec<u8> {
397    path.to_string_lossy().as_bytes().to_vec()
398}
399
400fn job_status_for(pending: usize) -> RepoStatus {
401    if pending > 0 {
402        RepoStatus::Busy
403    } else {
404        RepoStatus::Idle
405    }
406}
407
408#[derive(Serialize, Deserialize)]
409struct RepoRecord {
410    repo_path: PathBuf,
411    registered_at: u64,
412    last_event: Option<u64>,
413    status: RepoStatus,
414    pending_jobs: usize,
415    dirty_paths: Vec<PathBuf>,
416    generation: u64,
417    #[serde(default)]
418    needs_reconciliation: Option<bool>,
419    #[serde(default)]
420    last_watcher_token: Option<u64>,
421}
422
423impl From<&RepoMetadata> for RepoRecord {
424    fn from(value: &RepoMetadata) -> Self {
425        Self {
426            repo_path: value.repo_path.clone(),
427            registered_at: encode_time(value.registered_at),
428            last_event: value.last_event.map(encode_time),
429            status: value.status.clone(),
430            pending_jobs: value.pending_jobs,
431            dirty_paths: value.dirty_paths.clone(),
432            generation: value.generation,
433            needs_reconciliation: value.needs_reconciliation,
434            last_watcher_token: value.last_watcher_token,
435        }
436    }
437}
438
439impl From<RepoRecord> for RepoMetadata {
440    fn from(value: RepoRecord) -> Self {
441        Self {
442            repo_path: value.repo_path,
443            registered_at: decode_time(value.registered_at),
444            last_event: value.last_event.map(decode_time),
445            status: value.status,
446            pending_jobs: value.pending_jobs,
447            dirty_paths: value.dirty_paths,
448            generation: value.generation,
449            needs_reconciliation: value.needs_reconciliation,
450            last_watcher_token: value.last_watcher_token,
451        }
452    }
453}
454
455fn encode_time(time: SystemTime) -> u64 {
456    time.duration_since(UNIX_EPOCH)
457        .unwrap_or_else(|_| Duration::from_secs(0))
458        .as_secs()
459}
460
461fn decode_time(secs: u64) -> SystemTime {
462    UNIX_EPOCH + Duration::from_secs(secs)
463}
464
465fn deserialize_record(bytes: &[u8]) -> StorageResult<RepoMetadata> {
466    let record: RepoRecord =
467        bincode::deserialize(bytes).map_err(|err| StorageError::Backend(err.to_string()))?;
468    Ok(record.into())
469}
470
471fn map_sled_err<E: std::fmt::Display>(err: E) -> StorageError {
472    StorageError::Backend(err.to_string())
473}
474
475/// Helper that aligns all persisted artifacts (sled, future caches) under a
476/// single directory.
477#[derive(Debug, Clone)]
478pub struct StorageContext {
479    metadata_path: PathBuf,
480    log_path: PathBuf,
481}
482
483/// Statistics about the database storage.
484#[derive(Debug, Clone)]
485pub struct DbStats {
486    /// Total size of metadata database directory in bytes.
487    pub metadata_size_bytes: u64,
488    /// Total size of logs database directory in bytes.
489    pub logs_size_bytes: u64,
490    /// Number of registered repositories.
491    pub repo_count: usize,
492    /// Total number of log entries.
493    pub log_entry_count: usize,
494}
495
496impl StorageContext {
497    /// Creates the metadata directory (if missing) beneath `data_root`.
498    pub fn new(data_root: impl AsRef<Path>) -> StorageResult<Self> {
499        let metadata_path = data_root.as_ref().join("sled");
500        let log_path = data_root.as_ref().join("logs");
501        fs::create_dir_all(&metadata_path).map_err(map_sled_err)?;
502        fs::create_dir_all(&log_path).map_err(map_sled_err)?;
503        Ok(Self {
504            metadata_path,
505            log_path,
506        })
507    }
508
509    /// Returns a sled-backed metadata store rooted at this context's path.
510    pub fn metadata_store(&self) -> StorageResult<SledMetadataStore> {
511        SledMetadataStore::open(&self.metadata_path)
512    }
513
514    pub fn log_tree(&self) -> StorageResult<sled::Tree> {
515        let db = sled::open(&self.log_path).map_err(map_sled_err)?;
516        db.open_tree("logs").map_err(map_sled_err)
517    }
518
519    pub fn metadata_path(&self) -> &Path {
520        &self.metadata_path
521    }
522
523    pub fn log_path(&self) -> &Path {
524        &self.log_path
525    }
526
527    /// Compact both metadata and logs databases to reclaim space.
528    pub fn compact_all(&self) -> StorageResult<()> {
529        // Open and flush metadata database
530        let meta_db = sled::open(&self.metadata_path).map_err(map_sled_err)?;
531        meta_db.flush().map_err(map_sled_err)?;
532
533        // Open and flush logs database
534        let log_db = sled::open(&self.log_path).map_err(map_sled_err)?;
535        log_db.flush().map_err(map_sled_err)?;
536
537        Ok(())
538    }
539
540    /// Get statistics about the database storage.
541    pub fn stats(&self) -> StorageResult<DbStats> {
542        let metadata_size_bytes = dir_size(&self.metadata_path);
543        let logs_size_bytes = dir_size(&self.log_path);
544
545        let store = self.metadata_store()?;
546        let repo_count = store.list_repos()?.len();
547
548        let log_tree = self.log_tree()?;
549        let log_entry_count = log_tree.len();
550
551        Ok(DbStats {
552            metadata_size_bytes,
553            logs_size_bytes,
554            repo_count,
555            log_entry_count,
556        })
557    }
558
559    /// Prune log entries older than the specified duration.
560    /// Returns the number of entries pruned.
561    pub fn prune_old_log_entries(&self, max_age: Duration) -> StorageResult<usize> {
562        let log_tree = self.log_tree()?;
563        let cutoff = SystemTime::now()
564            .checked_sub(max_age)
565            .unwrap_or(UNIX_EPOCH);
566        let cutoff_nanos = cutoff
567            .duration_since(UNIX_EPOCH)
568            .unwrap_or_else(|_| Duration::from_secs(0))
569            .as_nanos();
570
571        let mut pruned = 0;
572        let keys_to_remove: Vec<_> = log_tree
573            .iter()
574            .filter_map(|result| result.ok())
575            .filter_map(|(key, _)| {
576                // Log key format: repo_path_bytes + 0x00 + timestamp_nanos_be_bytes (u128)
577                // The timestamp is in the last 16 bytes of the key
578                if key.len() < 17 {
579                    return None; // Invalid key format
580                }
581                let ts_bytes: [u8; 16] = key[key.len() - 16..].try_into().ok()?;
582                let timestamp_nanos = u128::from_be_bytes(ts_bytes);
583                if timestamp_nanos < cutoff_nanos {
584                    Some(key)
585                } else {
586                    None
587                }
588            })
589            .collect();
590
591        for key in keys_to_remove {
592            if log_tree.remove(&key).is_ok() {
593                pruned += 1;
594            }
595        }
596
597        // Flush after pruning
598        if pruned > 0 {
599            let db = sled::open(&self.log_path).map_err(map_sled_err)?;
600            db.flush().map_err(map_sled_err)?;
601        }
602
603        Ok(pruned)
604    }
605}
606
607/// Calculate total size of a directory recursively.
608fn dir_size(path: &Path) -> u64 {
609    let mut total = 0;
610    if let Ok(entries) = fs::read_dir(path) {
611        for entry in entries.flatten() {
612            let entry_path = entry.path();
613            if entry_path.is_dir() {
614                total += dir_size(&entry_path);
615            } else if let Ok(metadata) = entry.metadata() {
616                total += metadata.len();
617            }
618        }
619    }
620    total
621}
622
623#[cfg(test)]
624mod tests {
625    use super::*;
626    use std::time::UNIX_EPOCH;
627
628    #[test]
629    fn register_and_list_repositories() {
630        let store = InMemoryMetadataStore::new();
631        store
632            .register_repo(PathBuf::from("/tmp/demo"))
633            .expect("register");
634        let repos = store.list_repos().expect("list");
635        assert_eq!(repos.len(), 1);
636        assert_eq!(repos[0].repo_path, PathBuf::from("/tmp/demo"));
637    }
638
639    #[test]
640    fn unregister_repository() {
641        let store = InMemoryMetadataStore::new();
642        let path = PathBuf::from("/tmp/demo");
643        store.register_repo(path.clone()).unwrap();
644        let removed = store.unregister_repo(&path).unwrap();
645        assert!(removed.is_some());
646        assert!(store.unregister_repo(&path).unwrap().is_none());
647    }
648
649    #[test]
650    fn job_counters_do_not_underflow() {
651        let store = InMemoryMetadataStore::new();
652        let path = PathBuf::from("/tmp/demo");
653        store.register_repo(path.clone()).unwrap();
654        store.increment_jobs(&path, 5).unwrap();
655        let snapshot = store.increment_jobs(&path, -10).unwrap();
656        assert_eq!(snapshot.pending_jobs, 0);
657    }
658
659    #[test]
660    fn in_memory_job_status_tracks_pending_jobs() {
661        let store = InMemoryMetadataStore::new();
662        let path = PathBuf::from("/tmp/demo");
663        let initial = store.register_repo(path.clone()).unwrap();
664        assert_eq!(initial.status, RepoStatus::Idle);
665        let snapshot = store.increment_jobs(&path, 1).unwrap();
666        assert_eq!(snapshot.status, RepoStatus::Busy);
667        let snapshot = store.increment_jobs(&path, -1).unwrap();
668        assert_eq!(snapshot.status, RepoStatus::Idle);
669    }
670
671    #[test]
672    fn record_last_event() {
673        let store = InMemoryMetadataStore::new();
674        let path = PathBuf::from("/tmp/demo");
675        store.register_repo(path.clone()).unwrap();
676        store.record_event(&path, UNIX_EPOCH).unwrap();
677        let repos = store.list_repos().unwrap();
678        assert_eq!(repos[0].last_event, Some(UNIX_EPOCH));
679    }
680
681    #[test]
682    fn sled_store_persists_between_instances() {
683        let dir = tempfile::tempdir().unwrap();
684        let db_path = dir.path().join("db");
685        {
686            let store = SledMetadataStore::open(&db_path).unwrap();
687            store
688                .register_repo(PathBuf::from("/tmp/demo"))
689                .expect("register");
690        }
691        {
692            let store = SledMetadataStore::open(&db_path).unwrap();
693            let repos = store.list_repos().unwrap();
694            assert_eq!(repos.len(), 1);
695            assert_eq!(repos[0].repo_path, PathBuf::from("/tmp/demo"));
696        }
697    }
698
699    #[test]
700    fn sled_store_updates_jobs() {
701        let dir = tempfile::tempdir().unwrap();
702        let store = SledMetadataStore::open(dir.path()).unwrap();
703        let path = PathBuf::from("/tmp/demo");
704        store.register_repo(path.clone()).unwrap();
705        store.increment_jobs(&path, 2).unwrap();
706        let snapshot = store.increment_jobs(&path, -1).unwrap();
707        assert_eq!(snapshot.pending_jobs, 1);
708    }
709
710    #[test]
711    fn sled_job_status_tracks_pending_jobs() {
712        let dir = tempfile::tempdir().unwrap();
713        let store = SledMetadataStore::open(dir.path()).unwrap();
714        let path = PathBuf::from("/tmp/demo");
715        let initial = store.register_repo(path.clone()).unwrap();
716        assert_eq!(initial.status, RepoStatus::Idle);
717        let snapshot = store.increment_jobs(&path, 1).unwrap();
718        assert_eq!(snapshot.status, RepoStatus::Busy);
719        let snapshot = store.increment_jobs(&path, -1).unwrap();
720        assert_eq!(snapshot.status, RepoStatus::Idle);
721    }
722
723    #[test]
724    fn dirty_paths_track_changes_in_memory() {
725        let store = InMemoryMetadataStore::new();
726        let path = PathBuf::from("/tmp/demo");
727        store.register_repo(path.clone()).unwrap();
728        store
729            .mark_dirty_path(&path, PathBuf::from("file.txt"))
730            .unwrap();
731        store
732            .mark_dirty_path(&path, PathBuf::from("file.txt"))
733            .unwrap();
734        let dirty = store.drain_dirty_paths(&path).unwrap();
735        assert_eq!(dirty, vec![PathBuf::from("file.txt")]);
736        assert!(store.drain_dirty_paths(&path).unwrap().is_empty());
737    }
738
739    #[test]
740    fn dirty_paths_persist_in_sled() {
741        let dir = tempfile::tempdir().unwrap();
742        let store = SledMetadataStore::open(dir.path()).unwrap();
743        let path = PathBuf::from("/tmp/demo");
744        store.register_repo(path.clone()).unwrap();
745        store
746            .mark_dirty_path(&path, PathBuf::from("a.txt"))
747            .unwrap();
748        let drained = store.drain_dirty_paths(&path).unwrap();
749        assert_eq!(drained, vec![PathBuf::from("a.txt")]);
750    }
751
752    #[test]
753    fn generation_counters_increment() {
754        let store = InMemoryMetadataStore::new();
755        let path = PathBuf::from("/tmp/demo");
756        store.register_repo(path.clone()).unwrap();
757        assert_eq!(store.current_generation(&path).unwrap(), 0);
758        store.bump_generation(&path).unwrap();
759        assert_eq!(store.current_generation(&path).unwrap(), 1);
760    }
761
762    #[test]
763    fn storage_context_prepares_directories() {
764        let dir = tempfile::tempdir().unwrap();
765        let context = StorageContext::new(dir.path()).unwrap();
766        assert!(context.metadata_path().exists());
767        context.metadata_store().unwrap();
768    }
769}