1use gity_ipc::RepoStatus;
2use serde::{Deserialize, Serialize};
3use std::{
4 collections::HashMap,
5 fs,
6 path::{Path, PathBuf},
7 sync::RwLock,
8 time::{Duration, SystemTime, UNIX_EPOCH},
9};
10use thiserror::Error;
11use tracing::warn;
12
13const MAX_DIRTY_PATHS: usize = 10_000;
17
18pub type StorageResult<T> = Result<T, StorageError>;
19
20#[derive(Debug, Error, PartialEq, Eq)]
22pub enum StorageError {
23 #[error("repository not registered: {0}")]
24 NotFound(String),
25 #[error("internal locking error")]
26 Poisoned,
27 #[error("storage backend error: {0}")]
28 Backend(String),
29}
30
31#[derive(Debug, Clone, PartialEq, Eq)]
33pub struct RepoMetadata {
34 pub repo_path: PathBuf,
35 pub registered_at: SystemTime,
36 pub last_event: Option<SystemTime>,
37 pub status: RepoStatus,
38 pub pending_jobs: usize,
39 pub dirty_paths: Vec<PathBuf>,
40 pub generation: u64,
41 pub needs_reconciliation: Option<bool>,
42 pub last_watcher_token: Option<u64>,
43}
44
45impl RepoMetadata {
46 pub fn new(repo_path: PathBuf) -> Self {
47 Self {
48 repo_path,
49 registered_at: SystemTime::now(),
50 last_event: None,
51 status: RepoStatus::Idle,
52 pending_jobs: 0,
53 dirty_paths: Vec::new(),
54 generation: 0,
55 needs_reconciliation: Some(false),
56 last_watcher_token: None,
57 }
58 }
59}
60
61pub trait MetadataStore: Send + Sync + 'static {
63 fn register_repo(&self, repo_path: PathBuf) -> StorageResult<RepoMetadata>;
64 fn unregister_repo(&self, repo_path: &Path) -> StorageResult<Option<RepoMetadata>>;
65 fn get_repo(&self, repo_path: &Path) -> StorageResult<Option<RepoMetadata>>;
66 fn list_repos(&self) -> StorageResult<Vec<RepoMetadata>>;
67 fn update_repo_status(&self, repo_path: &Path, status: RepoStatus) -> StorageResult<()>;
68 fn increment_jobs(&self, repo_path: &Path, delta: isize) -> StorageResult<RepoMetadata>;
69 fn record_event(&self, repo_path: &Path, when: SystemTime) -> StorageResult<()>;
70 fn mark_dirty_path(&self, repo_path: &Path, path: PathBuf) -> StorageResult<()>;
71 fn drain_dirty_paths(&self, repo_path: &Path) -> StorageResult<Vec<PathBuf>>;
72 fn dirty_path_count(&self, repo_path: &Path) -> StorageResult<usize>;
73 fn current_generation(&self, repo_path: &Path) -> StorageResult<u64>;
74 fn bump_generation(&self, repo_path: &Path) -> StorageResult<u64>;
75 fn set_needs_reconciliation(&self, repo_path: &Path, needs: bool) -> StorageResult<()>;
76 fn set_watcher_token(&self, repo_path: &Path, token: u64) -> StorageResult<()>;
77}
78
79pub struct InMemoryMetadataStore {
81 inner: RwLock<HashMap<PathBuf, RepoMetadata>>,
82}
83
84impl InMemoryMetadataStore {
85 pub fn new() -> Self {
86 Self {
87 inner: RwLock::new(HashMap::new()),
88 }
89 }
90}
91
92impl Default for InMemoryMetadataStore {
93 fn default() -> Self {
94 Self::new()
95 }
96}
97
98impl MetadataStore for InMemoryMetadataStore {
99 fn register_repo(&self, repo_path: PathBuf) -> StorageResult<RepoMetadata> {
100 let mut guard = self.inner.write().map_err(|_| StorageError::Poisoned)?;
101 let entry = guard
102 .entry(repo_path.clone())
103 .or_insert_with(|| RepoMetadata::new(repo_path));
104 Ok(entry.clone())
105 }
106
107 fn unregister_repo(&self, repo_path: &Path) -> StorageResult<Option<RepoMetadata>> {
108 let mut guard = self.inner.write().map_err(|_| StorageError::Poisoned)?;
109 Ok(guard.remove(repo_path))
110 }
111
112 fn get_repo(&self, repo_path: &Path) -> StorageResult<Option<RepoMetadata>> {
113 let guard = self.inner.read().map_err(|_| StorageError::Poisoned)?;
114 Ok(guard.get(repo_path).cloned())
115 }
116
117 fn list_repos(&self) -> StorageResult<Vec<RepoMetadata>> {
118 let guard = self.inner.read().map_err(|_| StorageError::Poisoned)?;
119 Ok(guard.values().cloned().collect())
120 }
121
122 fn update_repo_status(&self, repo_path: &Path, status: RepoStatus) -> StorageResult<()> {
123 let mut guard = self.inner.write().map_err(|_| StorageError::Poisoned)?;
124 let entry = guard
125 .get_mut(repo_path)
126 .ok_or_else(|| StorageError::NotFound(repo_path.display().to_string()))?;
127 entry.status = status;
128 Ok(())
129 }
130
131 fn increment_jobs(&self, repo_path: &Path, delta: isize) -> StorageResult<RepoMetadata> {
132 let mut guard = self.inner.write().map_err(|_| StorageError::Poisoned)?;
133 let entry = guard
134 .get_mut(repo_path)
135 .ok_or_else(|| StorageError::NotFound(repo_path.display().to_string()))?;
136 if delta >= 0 {
137 entry.pending_jobs = entry.pending_jobs.saturating_add(delta as usize);
138 } else {
139 entry.pending_jobs = entry.pending_jobs.saturating_sub(delta.unsigned_abs());
140 }
141 entry.status = job_status_for(entry.pending_jobs);
142 Ok(entry.clone())
143 }
144
145 fn record_event(&self, repo_path: &Path, when: SystemTime) -> StorageResult<()> {
146 let mut guard = self.inner.write().map_err(|_| StorageError::Poisoned)?;
147 let entry = guard
148 .get_mut(repo_path)
149 .ok_or_else(|| StorageError::NotFound(repo_path.display().to_string()))?;
150 entry.last_event = Some(when);
151 Ok(())
152 }
153
154 fn current_generation(&self, repo_path: &Path) -> StorageResult<u64> {
155 let guard = self.inner.read().map_err(|_| StorageError::Poisoned)?;
156 let entry = guard
157 .get(repo_path)
158 .ok_or_else(|| StorageError::NotFound(repo_path.display().to_string()))?;
159 Ok(entry.generation)
160 }
161
162 fn bump_generation(&self, repo_path: &Path) -> StorageResult<u64> {
163 let mut guard = self.inner.write().map_err(|_| StorageError::Poisoned)?;
164 let entry = guard
165 .get_mut(repo_path)
166 .ok_or_else(|| StorageError::NotFound(repo_path.display().to_string()))?;
167 entry.generation = entry.generation.saturating_add(1);
168 Ok(entry.generation)
169 }
170
171 fn mark_dirty_path(&self, repo_path: &Path, path: PathBuf) -> StorageResult<()> {
172 let mut guard = self.inner.write().map_err(|_| StorageError::Poisoned)?;
173 let entry = guard
174 .get_mut(repo_path)
175 .ok_or_else(|| StorageError::NotFound(repo_path.display().to_string()))?;
176
177 if entry.dirty_paths.len() >= MAX_DIRTY_PATHS {
179 if entry.dirty_paths.first() != Some(&PathBuf::from(".")) {
181 warn!(
182 repo = %repo_path.display(),
183 "dirty_paths limit ({}) exceeded, marking entire repo dirty",
184 MAX_DIRTY_PATHS
185 );
186 entry.dirty_paths.clear();
187 entry.dirty_paths.push(PathBuf::from("."));
188 }
189 return Ok(());
190 }
191
192 if !entry.dirty_paths.contains(&path) {
193 entry.dirty_paths.push(path);
194 }
195 Ok(())
196 }
197
198 fn drain_dirty_paths(&self, repo_path: &Path) -> StorageResult<Vec<PathBuf>> {
199 let mut guard = self.inner.write().map_err(|_| StorageError::Poisoned)?;
200 let entry = guard
201 .get_mut(repo_path)
202 .ok_or_else(|| StorageError::NotFound(repo_path.display().to_string()))?;
203 Ok(std::mem::take(&mut entry.dirty_paths))
204 }
205
206 fn dirty_path_count(&self, repo_path: &Path) -> StorageResult<usize> {
207 let guard = self.inner.read().map_err(|_| StorageError::Poisoned)?;
208 let entry = guard
209 .get(repo_path)
210 .ok_or_else(|| StorageError::NotFound(repo_path.display().to_string()))?;
211 Ok(entry.dirty_paths.len())
212 }
213
214 fn set_needs_reconciliation(&self, repo_path: &Path, needs: bool) -> StorageResult<()> {
215 let mut guard = self.inner.write().map_err(|_| StorageError::Poisoned)?;
216 let entry = guard
217 .get_mut(repo_path)
218 .ok_or_else(|| StorageError::NotFound(repo_path.display().to_string()))?;
219 entry.needs_reconciliation = Some(needs);
220 Ok(())
221 }
222
223 fn set_watcher_token(&self, repo_path: &Path, token: u64) -> StorageResult<()> {
224 let mut guard = self.inner.write().map_err(|_| StorageError::Poisoned)?;
225 let entry = guard
226 .get_mut(repo_path)
227 .ok_or_else(|| StorageError::NotFound(repo_path.display().to_string()))?;
228 entry.last_watcher_token = Some(token);
229 Ok(())
230 }
231}
232
233#[derive(Clone)]
235pub struct SledMetadataStore {
236 tree: sled::Tree,
237}
238
239impl SledMetadataStore {
240 pub fn open(path: impl AsRef<Path>) -> StorageResult<Self> {
241 let db = sled::open(path).map_err(map_sled_err)?;
242 let tree = db.open_tree("repos").map_err(map_sled_err)?;
243 Ok(Self { tree })
244 }
245
246 fn load_repo(&self, repo_path: &Path) -> StorageResult<RepoMetadata> {
247 let key = repo_key(repo_path);
248 let Some(bytes) = self.tree.get(&key).map_err(map_sled_err)? else {
249 return Err(StorageError::NotFound(repo_path.display().to_string()));
250 };
251 deserialize_record(bytes.as_ref())
252 }
253
254 fn write_repo(&self, metadata: &RepoMetadata) -> StorageResult<()> {
255 let key = repo_key(&metadata.repo_path);
256 let record = RepoRecord::from(metadata);
257 let bytes =
258 bincode::serialize(&record).map_err(|err| StorageError::Backend(err.to_string()))?;
259 self.tree.insert(key, bytes).map_err(map_sled_err)?;
260 Ok(())
261 }
262
263 fn update_repo<F>(&self, repo_path: &Path, mutator: F) -> StorageResult<RepoMetadata>
264 where
265 F: FnOnce(&mut RepoMetadata),
266 {
267 let mut current = self.load_repo(repo_path)?;
268 mutator(&mut current);
269 self.write_repo(¤t)?;
270 Ok(current)
271 }
272}
273
274impl MetadataStore for SledMetadataStore {
275 fn register_repo(&self, repo_path: PathBuf) -> StorageResult<RepoMetadata> {
276 let key = repo_key(&repo_path);
277 if let Some(existing) = self.tree.get(&key).map_err(map_sled_err)? {
278 return deserialize_record(existing.as_ref());
279 }
280 let metadata = RepoMetadata::new(repo_path);
281 self.write_repo(&metadata)?;
282 Ok(metadata)
283 }
284
285 fn unregister_repo(&self, repo_path: &Path) -> StorageResult<Option<RepoMetadata>> {
286 let key = repo_key(repo_path);
287 let result = self.tree.remove(&key).map_err(map_sled_err)?;
288 Ok(match result {
289 Some(bytes) => Some(deserialize_record(bytes.as_ref())?),
290 None => None,
291 })
292 }
293
294 fn get_repo(&self, repo_path: &Path) -> StorageResult<Option<RepoMetadata>> {
295 let key = repo_key(repo_path);
296 match self.tree.get(&key).map_err(map_sled_err)? {
297 Some(bytes) => Ok(Some(deserialize_record(bytes.as_ref())?)),
298 None => Ok(None),
299 }
300 }
301
302 fn list_repos(&self) -> StorageResult<Vec<RepoMetadata>> {
303 let mut repos = Vec::new();
304 for entry in self.tree.iter() {
305 let (_, value) = entry.map_err(map_sled_err)?;
306 repos.push(deserialize_record(value.as_ref())?);
307 }
308 Ok(repos)
309 }
310
311 fn update_repo_status(&self, repo_path: &Path, status: RepoStatus) -> StorageResult<()> {
312 self.update_repo(repo_path, |meta| meta.status = status)?;
313 Ok(())
314 }
315
316 fn increment_jobs(&self, repo_path: &Path, delta: isize) -> StorageResult<RepoMetadata> {
317 self.update_repo(repo_path, |meta| {
318 if delta >= 0 {
319 meta.pending_jobs = meta.pending_jobs.saturating_add(delta as usize);
320 } else {
321 meta.pending_jobs = meta.pending_jobs.saturating_sub(delta.unsigned_abs());
322 }
323 meta.status = job_status_for(meta.pending_jobs);
324 })
325 }
326
327 fn record_event(&self, repo_path: &Path, when: SystemTime) -> StorageResult<()> {
328 self.update_repo(repo_path, |meta| meta.last_event = Some(when))?;
329 Ok(())
330 }
331
332 fn mark_dirty_path(&self, repo_path: &Path, path: PathBuf) -> StorageResult<()> {
333 self.update_repo(repo_path, |meta| {
334 if meta.dirty_paths.len() >= MAX_DIRTY_PATHS {
336 if meta.dirty_paths.first() != Some(&PathBuf::from(".")) {
338 warn!(
339 repo = %repo_path.display(),
340 "dirty_paths limit ({}) exceeded, marking entire repo dirty",
341 MAX_DIRTY_PATHS
342 );
343 meta.dirty_paths.clear();
344 meta.dirty_paths.push(PathBuf::from("."));
345 }
346 return;
347 }
348
349 if !meta.dirty_paths.contains(&path) {
350 meta.dirty_paths.push(path);
351 }
352 })?;
353 Ok(())
354 }
355
356 fn drain_dirty_paths(&self, repo_path: &Path) -> StorageResult<Vec<PathBuf>> {
357 let mut removed = Vec::new();
358 self.update_repo(repo_path, |meta| {
359 removed = std::mem::take(&mut meta.dirty_paths);
360 })?;
361 Ok(removed)
362 }
363
364 fn current_generation(&self, repo_path: &Path) -> StorageResult<u64> {
365 self.load_repo(repo_path).map(|meta| meta.generation)
366 }
367
368 fn bump_generation(&self, repo_path: &Path) -> StorageResult<u64> {
369 let mut generation = 0;
370 self.update_repo(repo_path, |meta| {
371 meta.generation = meta.generation.saturating_add(1);
372 generation = meta.generation;
373 })?;
374 Ok(generation)
375 }
376
377 fn dirty_path_count(&self, repo_path: &Path) -> StorageResult<usize> {
378 self.load_repo(repo_path).map(|meta| meta.dirty_paths.len())
379 }
380
381 fn set_needs_reconciliation(&self, repo_path: &Path, needs: bool) -> StorageResult<()> {
382 self.update_repo(repo_path, |meta| {
383 meta.needs_reconciliation = Some(needs);
384 })?;
385 Ok(())
386 }
387
388 fn set_watcher_token(&self, repo_path: &Path, token: u64) -> StorageResult<()> {
389 self.update_repo(repo_path, |meta| {
390 meta.last_watcher_token = Some(token);
391 })?;
392 Ok(())
393 }
394}
395
396fn repo_key(path: &Path) -> Vec<u8> {
397 path.to_string_lossy().as_bytes().to_vec()
398}
399
400fn job_status_for(pending: usize) -> RepoStatus {
401 if pending > 0 {
402 RepoStatus::Busy
403 } else {
404 RepoStatus::Idle
405 }
406}
407
408#[derive(Serialize, Deserialize)]
409struct RepoRecord {
410 repo_path: PathBuf,
411 registered_at: u64,
412 last_event: Option<u64>,
413 status: RepoStatus,
414 pending_jobs: usize,
415 dirty_paths: Vec<PathBuf>,
416 generation: u64,
417 #[serde(default)]
418 needs_reconciliation: Option<bool>,
419 #[serde(default)]
420 last_watcher_token: Option<u64>,
421}
422
423impl From<&RepoMetadata> for RepoRecord {
424 fn from(value: &RepoMetadata) -> Self {
425 Self {
426 repo_path: value.repo_path.clone(),
427 registered_at: encode_time(value.registered_at),
428 last_event: value.last_event.map(encode_time),
429 status: value.status.clone(),
430 pending_jobs: value.pending_jobs,
431 dirty_paths: value.dirty_paths.clone(),
432 generation: value.generation,
433 needs_reconciliation: value.needs_reconciliation,
434 last_watcher_token: value.last_watcher_token,
435 }
436 }
437}
438
439impl From<RepoRecord> for RepoMetadata {
440 fn from(value: RepoRecord) -> Self {
441 Self {
442 repo_path: value.repo_path,
443 registered_at: decode_time(value.registered_at),
444 last_event: value.last_event.map(decode_time),
445 status: value.status,
446 pending_jobs: value.pending_jobs,
447 dirty_paths: value.dirty_paths,
448 generation: value.generation,
449 needs_reconciliation: value.needs_reconciliation,
450 last_watcher_token: value.last_watcher_token,
451 }
452 }
453}
454
455fn encode_time(time: SystemTime) -> u64 {
456 time.duration_since(UNIX_EPOCH)
457 .unwrap_or_else(|_| Duration::from_secs(0))
458 .as_secs()
459}
460
461fn decode_time(secs: u64) -> SystemTime {
462 UNIX_EPOCH + Duration::from_secs(secs)
463}
464
465fn deserialize_record(bytes: &[u8]) -> StorageResult<RepoMetadata> {
466 let record: RepoRecord =
467 bincode::deserialize(bytes).map_err(|err| StorageError::Backend(err.to_string()))?;
468 Ok(record.into())
469}
470
471fn map_sled_err<E: std::fmt::Display>(err: E) -> StorageError {
472 StorageError::Backend(err.to_string())
473}
474
475#[derive(Debug, Clone)]
478pub struct StorageContext {
479 metadata_path: PathBuf,
480 log_path: PathBuf,
481}
482
483#[derive(Debug, Clone)]
485pub struct DbStats {
486 pub metadata_size_bytes: u64,
488 pub logs_size_bytes: u64,
490 pub repo_count: usize,
492 pub log_entry_count: usize,
494}
495
496impl StorageContext {
497 pub fn new(data_root: impl AsRef<Path>) -> StorageResult<Self> {
499 let metadata_path = data_root.as_ref().join("sled");
500 let log_path = data_root.as_ref().join("logs");
501 fs::create_dir_all(&metadata_path).map_err(map_sled_err)?;
502 fs::create_dir_all(&log_path).map_err(map_sled_err)?;
503 Ok(Self {
504 metadata_path,
505 log_path,
506 })
507 }
508
509 pub fn metadata_store(&self) -> StorageResult<SledMetadataStore> {
511 SledMetadataStore::open(&self.metadata_path)
512 }
513
514 pub fn log_tree(&self) -> StorageResult<sled::Tree> {
515 let db = sled::open(&self.log_path).map_err(map_sled_err)?;
516 db.open_tree("logs").map_err(map_sled_err)
517 }
518
519 pub fn metadata_path(&self) -> &Path {
520 &self.metadata_path
521 }
522
523 pub fn log_path(&self) -> &Path {
524 &self.log_path
525 }
526
527 pub fn compact_all(&self) -> StorageResult<()> {
529 let meta_db = sled::open(&self.metadata_path).map_err(map_sled_err)?;
531 meta_db.flush().map_err(map_sled_err)?;
532
533 let log_db = sled::open(&self.log_path).map_err(map_sled_err)?;
535 log_db.flush().map_err(map_sled_err)?;
536
537 Ok(())
538 }
539
540 pub fn stats(&self) -> StorageResult<DbStats> {
542 let metadata_size_bytes = dir_size(&self.metadata_path);
543 let logs_size_bytes = dir_size(&self.log_path);
544
545 let store = self.metadata_store()?;
546 let repo_count = store.list_repos()?.len();
547
548 let log_tree = self.log_tree()?;
549 let log_entry_count = log_tree.len();
550
551 Ok(DbStats {
552 metadata_size_bytes,
553 logs_size_bytes,
554 repo_count,
555 log_entry_count,
556 })
557 }
558
559 pub fn prune_old_log_entries(&self, max_age: Duration) -> StorageResult<usize> {
562 let log_tree = self.log_tree()?;
563 let cutoff = SystemTime::now()
564 .checked_sub(max_age)
565 .unwrap_or(UNIX_EPOCH);
566 let cutoff_nanos = cutoff
567 .duration_since(UNIX_EPOCH)
568 .unwrap_or_else(|_| Duration::from_secs(0))
569 .as_nanos();
570
571 let mut pruned = 0;
572 let keys_to_remove: Vec<_> = log_tree
573 .iter()
574 .filter_map(|result| result.ok())
575 .filter_map(|(key, _)| {
576 if key.len() < 17 {
579 return None; }
581 let ts_bytes: [u8; 16] = key[key.len() - 16..].try_into().ok()?;
582 let timestamp_nanos = u128::from_be_bytes(ts_bytes);
583 if timestamp_nanos < cutoff_nanos {
584 Some(key)
585 } else {
586 None
587 }
588 })
589 .collect();
590
591 for key in keys_to_remove {
592 if log_tree.remove(&key).is_ok() {
593 pruned += 1;
594 }
595 }
596
597 if pruned > 0 {
599 let db = sled::open(&self.log_path).map_err(map_sled_err)?;
600 db.flush().map_err(map_sled_err)?;
601 }
602
603 Ok(pruned)
604 }
605}
606
607fn dir_size(path: &Path) -> u64 {
609 let mut total = 0;
610 if let Ok(entries) = fs::read_dir(path) {
611 for entry in entries.flatten() {
612 let entry_path = entry.path();
613 if entry_path.is_dir() {
614 total += dir_size(&entry_path);
615 } else if let Ok(metadata) = entry.metadata() {
616 total += metadata.len();
617 }
618 }
619 }
620 total
621}
622
623#[cfg(test)]
624mod tests {
625 use super::*;
626 use std::time::UNIX_EPOCH;
627
628 #[test]
629 fn register_and_list_repositories() {
630 let store = InMemoryMetadataStore::new();
631 store
632 .register_repo(PathBuf::from("/tmp/demo"))
633 .expect("register");
634 let repos = store.list_repos().expect("list");
635 assert_eq!(repos.len(), 1);
636 assert_eq!(repos[0].repo_path, PathBuf::from("/tmp/demo"));
637 }
638
639 #[test]
640 fn unregister_repository() {
641 let store = InMemoryMetadataStore::new();
642 let path = PathBuf::from("/tmp/demo");
643 store.register_repo(path.clone()).unwrap();
644 let removed = store.unregister_repo(&path).unwrap();
645 assert!(removed.is_some());
646 assert!(store.unregister_repo(&path).unwrap().is_none());
647 }
648
649 #[test]
650 fn job_counters_do_not_underflow() {
651 let store = InMemoryMetadataStore::new();
652 let path = PathBuf::from("/tmp/demo");
653 store.register_repo(path.clone()).unwrap();
654 store.increment_jobs(&path, 5).unwrap();
655 let snapshot = store.increment_jobs(&path, -10).unwrap();
656 assert_eq!(snapshot.pending_jobs, 0);
657 }
658
659 #[test]
660 fn in_memory_job_status_tracks_pending_jobs() {
661 let store = InMemoryMetadataStore::new();
662 let path = PathBuf::from("/tmp/demo");
663 let initial = store.register_repo(path.clone()).unwrap();
664 assert_eq!(initial.status, RepoStatus::Idle);
665 let snapshot = store.increment_jobs(&path, 1).unwrap();
666 assert_eq!(snapshot.status, RepoStatus::Busy);
667 let snapshot = store.increment_jobs(&path, -1).unwrap();
668 assert_eq!(snapshot.status, RepoStatus::Idle);
669 }
670
671 #[test]
672 fn record_last_event() {
673 let store = InMemoryMetadataStore::new();
674 let path = PathBuf::from("/tmp/demo");
675 store.register_repo(path.clone()).unwrap();
676 store.record_event(&path, UNIX_EPOCH).unwrap();
677 let repos = store.list_repos().unwrap();
678 assert_eq!(repos[0].last_event, Some(UNIX_EPOCH));
679 }
680
681 #[test]
682 fn sled_store_persists_between_instances() {
683 let dir = tempfile::tempdir().unwrap();
684 let db_path = dir.path().join("db");
685 {
686 let store = SledMetadataStore::open(&db_path).unwrap();
687 store
688 .register_repo(PathBuf::from("/tmp/demo"))
689 .expect("register");
690 }
691 {
692 let store = SledMetadataStore::open(&db_path).unwrap();
693 let repos = store.list_repos().unwrap();
694 assert_eq!(repos.len(), 1);
695 assert_eq!(repos[0].repo_path, PathBuf::from("/tmp/demo"));
696 }
697 }
698
699 #[test]
700 fn sled_store_updates_jobs() {
701 let dir = tempfile::tempdir().unwrap();
702 let store = SledMetadataStore::open(dir.path()).unwrap();
703 let path = PathBuf::from("/tmp/demo");
704 store.register_repo(path.clone()).unwrap();
705 store.increment_jobs(&path, 2).unwrap();
706 let snapshot = store.increment_jobs(&path, -1).unwrap();
707 assert_eq!(snapshot.pending_jobs, 1);
708 }
709
710 #[test]
711 fn sled_job_status_tracks_pending_jobs() {
712 let dir = tempfile::tempdir().unwrap();
713 let store = SledMetadataStore::open(dir.path()).unwrap();
714 let path = PathBuf::from("/tmp/demo");
715 let initial = store.register_repo(path.clone()).unwrap();
716 assert_eq!(initial.status, RepoStatus::Idle);
717 let snapshot = store.increment_jobs(&path, 1).unwrap();
718 assert_eq!(snapshot.status, RepoStatus::Busy);
719 let snapshot = store.increment_jobs(&path, -1).unwrap();
720 assert_eq!(snapshot.status, RepoStatus::Idle);
721 }
722
723 #[test]
724 fn dirty_paths_track_changes_in_memory() {
725 let store = InMemoryMetadataStore::new();
726 let path = PathBuf::from("/tmp/demo");
727 store.register_repo(path.clone()).unwrap();
728 store
729 .mark_dirty_path(&path, PathBuf::from("file.txt"))
730 .unwrap();
731 store
732 .mark_dirty_path(&path, PathBuf::from("file.txt"))
733 .unwrap();
734 let dirty = store.drain_dirty_paths(&path).unwrap();
735 assert_eq!(dirty, vec![PathBuf::from("file.txt")]);
736 assert!(store.drain_dirty_paths(&path).unwrap().is_empty());
737 }
738
739 #[test]
740 fn dirty_paths_persist_in_sled() {
741 let dir = tempfile::tempdir().unwrap();
742 let store = SledMetadataStore::open(dir.path()).unwrap();
743 let path = PathBuf::from("/tmp/demo");
744 store.register_repo(path.clone()).unwrap();
745 store
746 .mark_dirty_path(&path, PathBuf::from("a.txt"))
747 .unwrap();
748 let drained = store.drain_dirty_paths(&path).unwrap();
749 assert_eq!(drained, vec![PathBuf::from("a.txt")]);
750 }
751
752 #[test]
753 fn generation_counters_increment() {
754 let store = InMemoryMetadataStore::new();
755 let path = PathBuf::from("/tmp/demo");
756 store.register_repo(path.clone()).unwrap();
757 assert_eq!(store.current_generation(&path).unwrap(), 0);
758 store.bump_generation(&path).unwrap();
759 assert_eq!(store.current_generation(&path).unwrap(), 1);
760 }
761
762 #[test]
763 fn storage_context_prepares_directories() {
764 let dir = tempfile::tempdir().unwrap();
765 let context = StorageContext::new(dir.path()).unwrap();
766 assert!(context.metadata_path().exists());
767 context.metadata_store().unwrap();
768 }
769}