1use flate2::read::ZlibDecoder;
13use flate2::write::ZlibEncoder;
14use flate2::Compression;
15use hashtree_config::{Config, StorageBackend};
16use hashtree_core::store::{Store, StoreError, StoreStats};
17use hashtree_core::types::Hash;
18use hashtree_core::{Cid, DirEntry, HashTree, HashTreeConfig, LinkType};
19use hashtree_fs::FsBlobStore;
20#[cfg(feature = "lmdb")]
21use hashtree_lmdb::LmdbBlobStore;
22use sha1::{Digest, Sha1};
23use std::collections::HashMap;
24use std::io::{Read, Write};
25use std::path::Path;
26use std::sync::Arc;
27use tokio::runtime::{Handle, Runtime};
28use tracing::{debug, info, warn};
29
30use super::object::{parse_tree, GitObject, ObjectId, ObjectType};
31use super::refs::{validate_ref_name, Ref};
32use super::{Error, Result};
33
34type BoxFuture<'a, T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
36
37enum RuntimeExecutor {
39 Owned(Runtime),
40 Handle(Handle),
41}
42
43impl RuntimeExecutor {
44 fn block_on<F: std::future::Future>(&self, f: F) -> F::Output {
45 match self {
46 RuntimeExecutor::Owned(rt) => rt.block_on(f),
47 RuntimeExecutor::Handle(handle) => tokio::task::block_in_place(|| handle.block_on(f)),
48 }
49 }
50}
51
52pub enum LocalStore {
54 Fs(FsBlobStore),
55 #[cfg(feature = "lmdb")]
56 Lmdb(LmdbBlobStore),
57}
58
59impl LocalStore {
60 pub fn new<P: AsRef<Path>>(path: P) -> std::result::Result<Self, StoreError> {
62 Self::new_with_max_bytes(path, 0)
63 }
64
65 pub fn new_with_max_bytes<P: AsRef<Path>>(
67 path: P,
68 max_bytes: u64,
69 ) -> std::result::Result<Self, StoreError> {
70 let config = Config::load_or_default();
71 match config.storage.backend {
72 StorageBackend::Fs => {
73 if max_bytes > 0 {
74 Ok(LocalStore::Fs(FsBlobStore::with_max_bytes(
75 path, max_bytes,
76 )?))
77 } else {
78 Ok(LocalStore::Fs(FsBlobStore::new(path)?))
79 }
80 }
81 #[cfg(feature = "lmdb")]
82 StorageBackend::Lmdb => {
83 if max_bytes > 0 {
84 warn!(
85 "LMDB backend ignores git cache eviction limits; configured limit will not be enforced"
86 );
87 }
88 Ok(LocalStore::Lmdb(LmdbBlobStore::new(path)?))
89 }
90 #[cfg(not(feature = "lmdb"))]
91 StorageBackend::Lmdb => {
92 warn!(
93 "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
94 );
95 if max_bytes > 0 {
96 Ok(LocalStore::Fs(FsBlobStore::with_max_bytes(
97 path, max_bytes,
98 )?))
99 } else {
100 Ok(LocalStore::Fs(FsBlobStore::new(path)?))
101 }
102 }
103 }
104 }
105
106 pub fn list(&self) -> std::result::Result<Vec<Hash>, StoreError> {
108 match self {
109 LocalStore::Fs(store) => store.list(),
110 #[cfg(feature = "lmdb")]
111 LocalStore::Lmdb(store) => store.list(),
112 }
113 }
114
115 pub fn get_sync(&self, hash: &Hash) -> std::result::Result<Option<Vec<u8>>, StoreError> {
117 match self {
118 LocalStore::Fs(store) => store.get_sync(hash),
119 #[cfg(feature = "lmdb")]
120 LocalStore::Lmdb(store) => store.get_sync(hash),
121 }
122 }
123}
124
125#[async_trait::async_trait]
126impl Store for LocalStore {
127 async fn put(&self, hash: Hash, data: Vec<u8>) -> std::result::Result<bool, StoreError> {
128 match self {
129 LocalStore::Fs(store) => store.put(hash, data).await,
130 #[cfg(feature = "lmdb")]
131 LocalStore::Lmdb(store) => store.put(hash, data).await,
132 }
133 }
134
135 async fn get(&self, hash: &Hash) -> std::result::Result<Option<Vec<u8>>, StoreError> {
136 match self {
137 LocalStore::Fs(store) => store.get(hash).await,
138 #[cfg(feature = "lmdb")]
139 LocalStore::Lmdb(store) => store.get(hash).await,
140 }
141 }
142
143 async fn has(&self, hash: &Hash) -> std::result::Result<bool, StoreError> {
144 match self {
145 LocalStore::Fs(store) => store.has(hash).await,
146 #[cfg(feature = "lmdb")]
147 LocalStore::Lmdb(store) => store.has(hash).await,
148 }
149 }
150
151 async fn delete(&self, hash: &Hash) -> std::result::Result<bool, StoreError> {
152 match self {
153 LocalStore::Fs(store) => store.delete(hash).await,
154 #[cfg(feature = "lmdb")]
155 LocalStore::Lmdb(store) => store.delete(hash).await,
156 }
157 }
158
159 fn set_max_bytes(&self, max: u64) {
160 match self {
161 LocalStore::Fs(store) => store.set_max_bytes(max),
162 #[cfg(feature = "lmdb")]
163 LocalStore::Lmdb(store) => store.set_max_bytes(max),
164 }
165 }
166
167 fn max_bytes(&self) -> Option<u64> {
168 match self {
169 LocalStore::Fs(store) => store.max_bytes(),
170 #[cfg(feature = "lmdb")]
171 LocalStore::Lmdb(store) => store.max_bytes(),
172 }
173 }
174
175 async fn stats(&self) -> StoreStats {
176 match self {
177 LocalStore::Fs(store) => match store.stats() {
178 Ok(stats) => StoreStats {
179 count: stats.count as u64,
180 bytes: stats.total_bytes,
181 pinned_count: stats.pinned_count as u64,
182 pinned_bytes: stats.pinned_bytes,
183 },
184 Err(_) => StoreStats::default(),
185 },
186 #[cfg(feature = "lmdb")]
187 LocalStore::Lmdb(store) => match store.stats() {
188 Ok(stats) => StoreStats {
189 count: stats.count as u64,
190 bytes: stats.total_bytes,
191 pinned_count: 0,
192 pinned_bytes: 0,
193 },
194 Err(_) => StoreStats::default(),
195 },
196 }
197 }
198
199 async fn evict_if_needed(&self) -> std::result::Result<u64, StoreError> {
200 match self {
201 LocalStore::Fs(store) => store.evict_if_needed().await,
202 #[cfg(feature = "lmdb")]
203 LocalStore::Lmdb(store) => store.evict_if_needed().await,
204 }
205 }
206}
207
208pub struct GitStorage {
210 store: Arc<LocalStore>,
211 tree: HashTree<LocalStore>,
212 runtime: RuntimeExecutor,
213 objects: std::sync::RwLock<HashMap<String, Vec<u8>>>,
215 refs: std::sync::RwLock<HashMap<String, String>>,
216 root_cid: std::sync::RwLock<Option<Cid>>,
218}
219
220impl GitStorage {
221 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
223 let config = Config::load_or_default();
224 let max_size_bytes = config
225 .storage
226 .max_size_gb
227 .saturating_mul(1024 * 1024 * 1024);
228 Self::open_with_max_bytes(path, max_size_bytes)
229 }
230
231 pub fn open_with_max_bytes(path: impl AsRef<Path>, max_size_bytes: u64) -> Result<Self> {
233 let runtime = match Handle::try_current() {
234 Ok(handle) => RuntimeExecutor::Handle(handle),
235 Err(_) => {
236 let rt = Runtime::new()
237 .map_err(|e| Error::StorageError(format!("tokio runtime: {}", e)))?;
238 RuntimeExecutor::Owned(rt)
239 }
240 };
241
242 let store_path = path.as_ref().join("blobs");
243 let store = Arc::new(
244 LocalStore::new_with_max_bytes(&store_path, max_size_bytes)
245 .map_err(|e| Error::StorageError(format!("local store: {}", e)))?,
246 );
247
248 let tree = HashTree::new(HashTreeConfig::new(store.clone()));
250
251 Ok(Self {
252 store,
253 tree,
254 runtime,
255 objects: std::sync::RwLock::new(HashMap::new()),
256 refs: std::sync::RwLock::new(HashMap::new()),
257 root_cid: std::sync::RwLock::new(None),
258 })
259 }
260
261 pub fn evict_if_needed(&self) -> Result<u64> {
263 self.runtime
264 .block_on(self.store.evict_if_needed())
265 .map_err(|e| Error::StorageError(format!("evict: {}", e)))
266 }
267
268 fn write_object(&self, obj: &GitObject) -> Result<ObjectId> {
270 let oid = obj.id();
271 let key = oid.to_hex();
272
273 let loose = obj.to_loose_format();
274 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
275 encoder.write_all(&loose)?;
276 let compressed = encoder.finish()?;
277
278 let mut objects = self
279 .objects
280 .write()
281 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
282 objects.insert(key, compressed);
283
284 if let Ok(mut root) = self.root_cid.write() {
286 *root = None;
287 }
288
289 Ok(oid)
290 }
291
292 pub fn write_raw_object(&self, obj_type: ObjectType, content: &[u8]) -> Result<ObjectId> {
294 let obj = GitObject::new(obj_type, content.to_vec());
295 self.write_object(&obj)
296 }
297
298 #[allow(dead_code)]
300 fn read_object(&self, oid: &ObjectId) -> Result<GitObject> {
301 let key = oid.to_hex();
302 let objects = self
303 .objects
304 .read()
305 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
306 let compressed = objects
307 .get(&key)
308 .ok_or_else(|| Error::ObjectNotFound(key.clone()))?;
309
310 let mut decoder = ZlibDecoder::new(compressed.as_slice());
311 let mut data = Vec::new();
312 decoder.read_to_end(&mut data)?;
313
314 GitObject::from_loose_format(&data)
315 }
316
317 pub fn write_ref(&self, name: &str, target: &Ref) -> Result<()> {
319 validate_ref_name(name)?;
320
321 let value = match target {
322 Ref::Direct(oid) => oid.to_hex(),
323 Ref::Symbolic(target) => format!("ref: {}", target),
324 };
325
326 let mut refs = self
327 .refs
328 .write()
329 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
330 refs.insert(name.to_string(), value);
331
332 if let Ok(mut root) = self.root_cid.write() {
334 *root = None;
335 }
336
337 Ok(())
338 }
339
340 #[allow(dead_code)]
342 pub fn read_ref(&self, name: &str) -> Result<Option<Ref>> {
343 let refs = self
344 .refs
345 .read()
346 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
347
348 match refs.get(name) {
349 Some(value) => {
350 if let Some(target) = value.strip_prefix("ref: ") {
351 Ok(Some(Ref::Symbolic(target.to_string())))
352 } else {
353 let oid = ObjectId::from_hex(value)
354 .ok_or_else(|| Error::StorageError(format!("invalid ref: {}", value)))?;
355 Ok(Some(Ref::Direct(oid)))
356 }
357 }
358 None => Ok(None),
359 }
360 }
361
362 #[allow(dead_code)]
364 pub fn list_refs(&self) -> Result<HashMap<String, String>> {
365 let refs = self
366 .refs
367 .read()
368 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
369 Ok(refs.clone())
370 }
371
372 pub fn delete_ref(&self, name: &str) -> Result<bool> {
374 let mut refs = self
375 .refs
376 .write()
377 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
378 let existed = refs.remove(name).is_some();
379
380 if let Ok(mut root) = self.root_cid.write() {
382 *root = None;
383 }
384
385 Ok(existed)
386 }
387
388 pub fn import_compressed_object(&self, oid: &str, compressed_data: Vec<u8>) -> Result<()> {
391 let mut objects = self
392 .objects
393 .write()
394 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
395 objects.insert(oid.to_string(), compressed_data);
396
397 if let Ok(mut root) = self.root_cid.write() {
399 *root = None;
400 }
401
402 Ok(())
403 }
404
405 pub fn import_ref(&self, name: &str, value: &str) -> Result<()> {
407 let mut refs = self
408 .refs
409 .write()
410 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
411 refs.insert(name.to_string(), value.to_string());
412
413 if let Ok(mut root) = self.root_cid.write() {
415 *root = None;
416 }
417
418 Ok(())
419 }
420
421 #[cfg(test)]
423 pub fn has_ref(&self, name: &str) -> Result<bool> {
424 let refs = self
425 .refs
426 .read()
427 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
428 Ok(refs.contains_key(name))
429 }
430
431 #[cfg(test)]
433 pub fn object_count(&self) -> Result<usize> {
434 let objects = self
435 .objects
436 .read()
437 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
438 Ok(objects.len())
439 }
440
441 #[allow(dead_code)]
443 pub fn get_root_cid(&self) -> Result<Option<Cid>> {
444 let root = self
445 .root_cid
446 .read()
447 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
448 Ok(root.clone())
449 }
450
451 #[allow(dead_code)]
453 pub fn default_branch(&self) -> Result<Option<String>> {
454 let refs = self
455 .refs
456 .read()
457 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
458
459 if let Some(head) = refs.get("HEAD") {
460 if let Some(target) = head.strip_prefix("ref: ") {
461 return Ok(Some(target.to_string()));
462 }
463 }
464 Ok(None)
465 }
466
467 fn get_commit_tree(
469 &self,
470 commit_oid: &str,
471 objects: &HashMap<String, Vec<u8>>,
472 ) -> Option<String> {
473 let compressed = objects.get(commit_oid)?;
474
475 let mut decoder = ZlibDecoder::new(&compressed[..]);
477 let mut decompressed = Vec::new();
478 decoder.read_to_end(&mut decompressed).ok()?;
479
480 let null_pos = decompressed.iter().position(|&b| b == 0)?;
482 let content = &decompressed[null_pos + 1..];
483
484 let content_str = std::str::from_utf8(content).ok()?;
486 let first_line = content_str.lines().next()?;
487 first_line
488 .strip_prefix("tree ")
489 .map(|tree_hash| tree_hash.to_string())
490 }
491
492 fn get_object_content(
494 &self,
495 oid: &str,
496 objects: &HashMap<String, Vec<u8>>,
497 ) -> Option<(ObjectType, Vec<u8>)> {
498 let compressed = objects.get(oid)?;
499
500 let mut decoder = ZlibDecoder::new(&compressed[..]);
502 let mut decompressed = Vec::new();
503 decoder.read_to_end(&mut decompressed).ok()?;
504
505 let null_pos = decompressed.iter().position(|&b| b == 0)?;
507 let header = std::str::from_utf8(&decompressed[..null_pos]).ok()?;
508 let obj_type = if header.starts_with("blob") {
509 ObjectType::Blob
510 } else if header.starts_with("tree") {
511 ObjectType::Tree
512 } else if header.starts_with("commit") {
513 ObjectType::Commit
514 } else {
515 return None;
516 };
517 let content = decompressed[null_pos + 1..].to_vec();
518 Some((obj_type, content))
519 }
520
521 pub fn build_tree(&self) -> Result<Cid> {
523 if let Ok(root) = self.root_cid.read() {
525 if let Some(ref cid) = *root {
526 return Ok(cid.clone());
527 }
528 }
529
530 let objects = self
531 .objects
532 .read()
533 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
534 let refs = self
535 .refs
536 .read()
537 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
538
539 let (default_branch, commit_sha) = if let Some(head) = refs.get("HEAD") {
541 let branch = head.strip_prefix("ref: ").map(String::from);
542 let sha = branch.as_ref().and_then(|b| refs.get(b)).cloned();
543 (branch, sha)
544 } else {
545 let mut branch_info: Option<(String, String)> = None;
547 for (ref_name, sha) in refs.iter() {
548 if ref_name.starts_with("refs/heads/") {
549 branch_info = Some((ref_name.clone(), sha.clone()));
550 break;
551 }
552 }
553 match branch_info {
554 Some((branch, sha)) => (Some(branch), Some(sha)),
555 None => (None, None),
556 }
557 };
558
559 let tree_sha = commit_sha
561 .as_ref()
562 .and_then(|sha| self.get_commit_tree(sha, &objects));
563
564 let objects_clone = objects.clone();
566
567 let root_cid = self.runtime.block_on(async {
568 let objects_cid = self.build_objects_dir(&objects).await?;
570
571 let refs_cid = self.build_refs_dir(&refs).await?;
573
574 let head_content = refs.get("HEAD")
577 .map(|h| if h.ends_with('\n') { h.clone() } else { format!("{}\n", h) })
578 .or_else(|| default_branch.as_ref().map(|b| format!("ref: {}\n", b)))
579 .unwrap_or_else(|| "ref: refs/heads/main\n".to_string());
580 debug!("HEAD content: {:?}", head_content);
581 let (head_cid, head_size) = self.tree.put(head_content.as_bytes()).await
582 .map_err(|e| Error::StorageError(format!("put HEAD: {}", e)))?;
583 debug!("HEAD hash: {}", hex::encode(head_cid.hash));
584
585 let mut git_entries = vec![
587 DirEntry::from_cid("HEAD", &head_cid).with_size(head_size),
588 DirEntry::from_cid("objects", &objects_cid).with_link_type(LinkType::Dir),
589 DirEntry::from_cid("refs", &refs_cid).with_link_type(LinkType::Dir),
590 ];
591
592 if let Some(ref branch) = default_branch {
594 let config = format!(
595 "[core]\n\trepositoryformatversion = 0\n\tfilemode = true\n\tbare = true\n[init]\n\tdefaultBranch = {}\n",
596 branch.trim_start_matches("refs/heads/")
597 );
598 let (config_cid, config_size) = self.tree.put(config.as_bytes()).await
599 .map_err(|e| Error::StorageError(format!("put config: {}", e)))?;
600 git_entries.push(DirEntry::from_cid("config", &config_cid).with_size(config_size));
601 }
602
603 if let Some(ref tree_oid) = tree_sha {
605 match self.build_index_file(tree_oid, &objects_clone) {
606 Ok(index_data) => {
607 let (index_cid, index_size) = self.tree.put(&index_data).await
608 .map_err(|e| Error::StorageError(format!("put index: {}", e)))?;
609 git_entries.push(DirEntry::from_cid("index", &index_cid).with_size(index_size));
610 info!("Added git index file ({} bytes)", index_data.len());
611 }
612 Err(e) => {
613 debug!("Failed to build git index file: {} - continuing without index", e);
614 }
615 }
616 }
617
618 let git_cid = self.tree.put_directory(git_entries).await
619 .map_err(|e| Error::StorageError(format!("put .git: {}", e)))?;
620
621 let mut root_entries = vec![DirEntry::from_cid(".git", &git_cid).with_link_type(LinkType::Dir)];
624
625 if let Some(ref tree_oid) = tree_sha {
627 let working_tree_entries = self.build_working_tree_entries(tree_oid, &objects_clone).await?;
628 root_entries.extend(working_tree_entries);
629 info!("Added {} working tree entries to root", root_entries.len() - 1);
630 }
631
632 root_entries.sort_by(|a, b| a.name.cmp(&b.name));
634
635 let root_cid = self.tree.put_directory(root_entries).await
636 .map_err(|e| Error::StorageError(format!("put root: {}", e)))?;
637
638 info!("Built hashtree root: {} (encrypted: {}) (.git dir: {})",
639 hex::encode(root_cid.hash),
640 root_cid.key.is_some(),
641 hex::encode(git_cid.hash));
642
643 Ok::<Cid, Error>(root_cid)
644 })?;
645
646 if let Ok(mut root) = self.root_cid.write() {
648 *root = Some(root_cid.clone());
649 }
650
651 Ok(root_cid)
652 }
653
654 async fn build_working_tree_entries(
656 &self,
657 tree_oid: &str,
658 objects: &HashMap<String, Vec<u8>>,
659 ) -> Result<Vec<DirEntry>> {
660 let mut entries = Vec::new();
661
662 let (obj_type, content) = self
664 .get_object_content(tree_oid, objects)
665 .ok_or_else(|| Error::ObjectNotFound(tree_oid.to_string()))?;
666
667 if obj_type != ObjectType::Tree {
668 return Err(Error::InvalidObjectType(format!(
669 "expected tree, got {:?}",
670 obj_type
671 )));
672 }
673
674 let tree_entries = parse_tree(&content)?;
676
677 for entry in tree_entries {
678 let oid_hex = entry.oid.to_hex();
679
680 if entry.is_tree() {
681 let sub_entries = self
683 .build_working_tree_entries_boxed(&oid_hex, objects)
684 .await?;
685
686 let dir_cid =
688 self.tree.put_directory(sub_entries).await.map_err(|e| {
689 Error::StorageError(format!("put dir {}: {}", entry.name, e))
690 })?;
691
692 entries
694 .push(DirEntry::from_cid(&entry.name, &dir_cid).with_link_type(LinkType::Dir));
695 } else {
696 if let Some((ObjectType::Blob, blob_content)) =
698 self.get_object_content(&oid_hex, objects)
699 {
700 let (cid, size) = self.tree.put(&blob_content).await.map_err(|e| {
702 Error::StorageError(format!("put blob {}: {}", entry.name, e))
703 })?;
704
705 entries.push(DirEntry::from_cid(&entry.name, &cid).with_size(size));
707 }
708 }
709 }
710
711 entries.sort_by(|a, b| a.name.cmp(&b.name));
713
714 Ok(entries)
715 }
716
717 fn build_working_tree_entries_boxed<'a>(
719 &'a self,
720 tree_oid: &'a str,
721 objects: &'a HashMap<String, Vec<u8>>,
722 ) -> BoxFuture<'a, Result<Vec<DirEntry>>> {
723 Box::pin(self.build_working_tree_entries(tree_oid, objects))
724 }
725
726 async fn build_objects_dir(&self, objects: &HashMap<String, Vec<u8>>) -> Result<Cid> {
728 if objects.is_empty() {
729 let empty_cid = self
731 .tree
732 .put_directory(vec![])
733 .await
734 .map_err(|e| Error::StorageError(format!("put empty objects: {}", e)))?;
735 return Ok(empty_cid);
736 }
737
738 let mut buckets: HashMap<String, Vec<(String, Vec<u8>)>> = HashMap::new();
741 for (oid, data) in objects {
742 let prefix = &oid[..2];
743 let suffix = &oid[2..];
744 buckets
745 .entry(prefix.to_string())
746 .or_default()
747 .push((suffix.to_string(), data.clone()));
748 }
749
750 let mut top_entries = Vec::new();
752 for (prefix, objs) in buckets {
753 let mut sub_entries = Vec::new();
754 for (suffix, data) in objs {
755 let (cid, size) = self.tree.put(&data).await.map_err(|e| {
758 Error::StorageError(format!("put object {}{}: {}", prefix, suffix, e))
759 })?;
760 sub_entries.push(DirEntry::from_cid(suffix, &cid).with_size(size));
762 }
763 sub_entries.sort_by(|a, b| a.name.cmp(&b.name));
765
766 let sub_cid = self
767 .tree
768 .put_directory(sub_entries)
769 .await
770 .map_err(|e| Error::StorageError(format!("put objects/{}: {}", prefix, e)))?;
771 top_entries.push(DirEntry::from_cid(prefix, &sub_cid).with_link_type(LinkType::Dir));
772 }
773
774 top_entries.sort_by(|a, b| a.name.cmp(&b.name));
776
777 let bucket_count = top_entries.len();
778 let cid = self
779 .tree
780 .put_directory(top_entries)
781 .await
782 .map_err(|e| Error::StorageError(format!("put objects dir: {}", e)))?;
783
784 debug!(
785 "Built objects dir with {} buckets: {}",
786 bucket_count,
787 hex::encode(cid.hash)
788 );
789 Ok(cid)
790 }
791
792 async fn build_refs_dir(&self, refs: &HashMap<String, String>) -> Result<Cid> {
794 let mut groups: HashMap<String, Vec<(String, String)>> = HashMap::new();
796
797 for (ref_name, value) in refs {
798 let parts: Vec<&str> = ref_name.split('/').collect();
799 if parts.len() >= 3 && parts[0] == "refs" {
800 let category = parts[1].to_string();
801 let name = parts[2..].join("/");
802 groups
803 .entry(category)
804 .or_default()
805 .push((name, value.clone()));
806 }
807 }
808
809 let mut ref_entries = Vec::new();
810
811 for (category, refs_in_category) in groups {
812 let mut cat_entries = Vec::new();
813 for (name, value) in refs_in_category {
814 let (cid, _size) = self
816 .tree
817 .put(value.as_bytes())
818 .await
819 .map_err(|e| Error::StorageError(format!("put ref: {}", e)))?;
820 debug!(
821 "refs/{}/{} -> blob {}",
822 category,
823 name,
824 hex::encode(cid.hash)
825 );
826 cat_entries.push(DirEntry::from_cid(name, &cid));
827 }
828
829 cat_entries.sort_by(|a, b| a.name.cmp(&b.name));
830
831 let cat_cid = self
832 .tree
833 .put_directory(cat_entries)
834 .await
835 .map_err(|e| Error::StorageError(format!("put {} dir: {}", category, e)))?;
836 debug!("refs/{} dir -> {}", category, hex::encode(cat_cid.hash));
837 ref_entries.push(DirEntry::from_cid(category, &cat_cid).with_link_type(LinkType::Dir));
838 }
839
840 if ref_entries.is_empty() {
841 let empty_cid = self
843 .tree
844 .put_directory(vec![])
845 .await
846 .map_err(|e| Error::StorageError(format!("put empty refs: {}", e)))?;
847 return Ok(empty_cid);
848 }
849
850 ref_entries.sort_by(|a, b| a.name.cmp(&b.name));
851
852 let refs_cid = self
853 .tree
854 .put_directory(ref_entries)
855 .await
856 .map_err(|e| Error::StorageError(format!("put refs dir: {}", e)))?;
857 debug!("refs dir -> {}", hex::encode(refs_cid.hash));
858 Ok(refs_cid)
859 }
860
861 fn build_index_file(
864 &self,
865 tree_oid: &str,
866 objects: &HashMap<String, Vec<u8>>,
867 ) -> Result<Vec<u8>> {
868 let mut entries: Vec<(String, [u8; 20], u32, u32)> = Vec::new(); self.collect_tree_entries_for_index(tree_oid, objects, "", &mut entries)?;
871
872 entries.sort_by(|a, b| a.0.cmp(&b.0));
874
875 let entry_count = entries.len() as u32;
876 debug!("Building git index with {} entries", entry_count);
877
878 let mut index_data = Vec::new();
880
881 index_data.extend_from_slice(b"DIRC");
883 index_data.extend_from_slice(&2u32.to_be_bytes()); index_data.extend_from_slice(&entry_count.to_be_bytes());
885
886 let now_sec = std::time::SystemTime::now()
888 .duration_since(std::time::UNIX_EPOCH)
889 .unwrap_or_default()
890 .as_secs() as u32;
891
892 for (path, sha1, mode, size) in &entries {
893 let entry_start = index_data.len();
894
895 index_data.extend_from_slice(&now_sec.to_be_bytes());
897 index_data.extend_from_slice(&0u32.to_be_bytes());
898 index_data.extend_from_slice(&now_sec.to_be_bytes());
900 index_data.extend_from_slice(&0u32.to_be_bytes());
901 index_data.extend_from_slice(&0u32.to_be_bytes());
903 index_data.extend_from_slice(&0u32.to_be_bytes());
904 index_data.extend_from_slice(&mode.to_be_bytes());
906 index_data.extend_from_slice(&0u32.to_be_bytes());
908 index_data.extend_from_slice(&0u32.to_be_bytes());
909 index_data.extend_from_slice(&size.to_be_bytes());
911 index_data.extend_from_slice(sha1);
913 let path_len = std::cmp::min(path.len(), 0xFFF) as u16;
915 index_data.extend_from_slice(&path_len.to_be_bytes());
916 index_data.extend_from_slice(path.as_bytes());
918 index_data.push(0); let entry_len = index_data.len() - entry_start;
922 let padding = (8 - (entry_len % 8)) % 8;
923 index_data.extend(std::iter::repeat_n(0, padding));
924 }
925
926 let mut hasher = Sha1::new();
928 hasher.update(&index_data);
929 let checksum = hasher.finalize();
930 index_data.extend_from_slice(&checksum);
931
932 debug!(
933 "Built git index: {} bytes, {} entries",
934 index_data.len(),
935 entry_count
936 );
937 Ok(index_data)
938 }
939
940 fn collect_tree_entries_for_index(
942 &self,
943 tree_oid: &str,
944 objects: &HashMap<String, Vec<u8>>,
945 prefix: &str,
946 entries: &mut Vec<(String, [u8; 20], u32, u32)>,
947 ) -> Result<()> {
948 let (obj_type, content) = self
949 .get_object_content(tree_oid, objects)
950 .ok_or_else(|| Error::ObjectNotFound(tree_oid.to_string()))?;
951
952 if obj_type != ObjectType::Tree {
953 return Err(Error::InvalidObjectType(format!(
954 "expected tree, got {:?}",
955 obj_type
956 )));
957 }
958
959 let tree_entries = parse_tree(&content)?;
960
961 for entry in tree_entries {
962 let path = if prefix.is_empty() {
963 entry.name.clone()
964 } else {
965 format!("{}/{}", prefix, entry.name)
966 };
967
968 let oid_hex = entry.oid.to_hex();
969
970 if entry.is_tree() {
971 self.collect_tree_entries_for_index(&oid_hex, objects, &path, entries)?;
973 } else {
974 if let Some((ObjectType::Blob, blob_content)) =
976 self.get_object_content(&oid_hex, objects)
977 {
978 let mut sha1_bytes = [0u8; 20];
980 if let Ok(bytes) = hex::decode(&oid_hex) {
981 if bytes.len() == 20 {
982 sha1_bytes.copy_from_slice(&bytes);
983 }
984 }
985
986 let mode = entry.mode;
988 let size = blob_content.len() as u32;
989
990 entries.push((path, sha1_bytes, mode, size));
991 }
992 }
993 }
994
995 Ok(())
996 }
997
998 pub fn store(&self) -> &Arc<LocalStore> {
1000 &self.store
1001 }
1002
1003 #[allow(dead_code)]
1005 pub fn hashtree(&self) -> &HashTree<LocalStore> {
1006 &self.tree
1007 }
1008
1009 #[allow(dead_code)]
1011 pub fn push_to_file_servers(
1012 &self,
1013 blossom: &hashtree_blossom::BlossomClient,
1014 ) -> Result<(usize, usize)> {
1015 let hashes = self
1016 .store
1017 .list()
1018 .map_err(|e| Error::StorageError(format!("list hashes: {}", e)))?;
1019
1020 info!("Pushing {} blobs to file servers", hashes.len());
1021
1022 let mut uploaded = 0;
1023 let mut existed = 0;
1024
1025 self.runtime.block_on(async {
1026 for hash in &hashes {
1027 let hex_hash = hex::encode(hash);
1028 let data = match self.store.get_sync(hash) {
1029 Ok(Some(d)) => d,
1030 _ => continue,
1031 };
1032
1033 match blossom.upload_if_missing(&data).await {
1034 Ok((_, true)) => {
1035 debug!("Uploaded {}", &hex_hash[..12]);
1036 uploaded += 1;
1037 }
1038 Ok((_, false)) => {
1039 existed += 1;
1040 }
1041 Err(e) => {
1042 debug!("Failed to upload {}: {}", &hex_hash[..12], e);
1043 }
1044 }
1045 }
1046 });
1047
1048 info!(
1049 "Upload complete: {} new, {} already existed",
1050 uploaded, existed
1051 );
1052 Ok((uploaded, existed))
1053 }
1054
1055 #[allow(dead_code)]
1057 pub fn clear(&self) -> Result<()> {
1058 let mut objects = self
1059 .objects
1060 .write()
1061 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
1062 let mut refs = self
1063 .refs
1064 .write()
1065 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
1066 let mut root = self
1067 .root_cid
1068 .write()
1069 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
1070
1071 objects.clear();
1072 refs.clear();
1073 *root = None;
1074 Ok(())
1075 }
1076}
1077
1078#[cfg(test)]
1079mod tests {
1080 use super::*;
1081 use tempfile::TempDir;
1082
1083 fn create_test_storage() -> (GitStorage, TempDir) {
1084 let temp_dir = TempDir::new().unwrap();
1085 let storage = GitStorage::open(temp_dir.path()).unwrap();
1086 (storage, temp_dir)
1087 }
1088
1089 fn create_test_storage_with_limit(max_size_bytes: u64) -> (GitStorage, TempDir) {
1090 let temp_dir = TempDir::new().unwrap();
1091 let storage = GitStorage::open_with_max_bytes(temp_dir.path(), max_size_bytes).unwrap();
1092 (storage, temp_dir)
1093 }
1094
1095 fn local_total_bytes(storage: &GitStorage) -> u64 {
1096 match storage.store().as_ref() {
1097 LocalStore::Fs(store) => store.stats().unwrap().total_bytes,
1098 #[cfg(feature = "lmdb")]
1099 LocalStore::Lmdb(store) => store.stats().unwrap().total_bytes,
1100 }
1101 }
1102
1103 #[test]
1104 fn test_import_ref() {
1105 let (storage, _temp) = create_test_storage();
1106
1107 storage
1109 .import_ref("refs/heads/main", "abc123def456")
1110 .unwrap();
1111
1112 assert!(storage.has_ref("refs/heads/main").unwrap());
1114
1115 let refs = storage.list_refs().unwrap();
1117 assert_eq!(
1118 refs.get("refs/heads/main"),
1119 Some(&"abc123def456".to_string())
1120 );
1121 }
1122
1123 #[test]
1124 fn test_import_multiple_refs_preserves_all() {
1125 let (storage, _temp) = create_test_storage();
1126
1127 storage.import_ref("refs/heads/main", "sha_main").unwrap();
1129 storage.import_ref("refs/heads/dev", "sha_dev").unwrap();
1130 storage
1131 .import_ref("refs/heads/feature", "sha_feature")
1132 .unwrap();
1133
1134 assert!(storage.has_ref("refs/heads/main").unwrap());
1136 assert!(storage.has_ref("refs/heads/dev").unwrap());
1137 assert!(storage.has_ref("refs/heads/feature").unwrap());
1138
1139 storage
1141 .write_ref(
1142 "refs/heads/new-branch",
1143 &Ref::Direct(
1144 ObjectId::from_hex("0123456789abcdef0123456789abcdef01234567").unwrap(),
1145 ),
1146 )
1147 .unwrap();
1148
1149 let refs = storage.list_refs().unwrap();
1151 assert_eq!(refs.len(), 4);
1152 assert!(refs.contains_key("refs/heads/main"));
1153 assert!(refs.contains_key("refs/heads/dev"));
1154 assert!(refs.contains_key("refs/heads/feature"));
1155 assert!(refs.contains_key("refs/heads/new-branch"));
1156 }
1157
1158 #[test]
1159 fn test_import_compressed_object() {
1160 let (storage, _temp) = create_test_storage();
1161
1162 let fake_compressed = vec![0x78, 0x9c, 0x01, 0x02, 0x03]; storage
1166 .import_compressed_object("abc123def456", fake_compressed.clone())
1167 .unwrap();
1168
1169 assert_eq!(storage.object_count().unwrap(), 1);
1171 }
1172
1173 #[test]
1174 fn test_write_ref_overwrites_imported() {
1175 let (storage, _temp) = create_test_storage();
1176
1177 storage.import_ref("refs/heads/main", "old_sha").unwrap();
1179
1180 storage
1182 .write_ref(
1183 "refs/heads/main",
1184 &Ref::Direct(
1185 ObjectId::from_hex("0123456789abcdef0123456789abcdef01234567").unwrap(),
1186 ),
1187 )
1188 .unwrap();
1189
1190 let refs = storage.list_refs().unwrap();
1192 assert_eq!(
1193 refs.get("refs/heads/main"),
1194 Some(&"0123456789abcdef0123456789abcdef01234567".to_string())
1195 );
1196 }
1197
1198 #[test]
1199 fn test_delete_ref_preserves_others() {
1200 let (storage, _temp) = create_test_storage();
1201
1202 storage.import_ref("refs/heads/main", "sha_main").unwrap();
1204 storage.import_ref("refs/heads/dev", "sha_dev").unwrap();
1205
1206 storage.delete_ref("refs/heads/dev").unwrap();
1208
1209 assert!(storage.has_ref("refs/heads/main").unwrap());
1211 assert!(!storage.has_ref("refs/heads/dev").unwrap());
1212 }
1213
1214 #[test]
1215 fn test_clear_removes_all() {
1216 let (storage, _temp) = create_test_storage();
1217
1218 storage.import_ref("refs/heads/main", "sha_main").unwrap();
1220 storage
1221 .import_compressed_object("obj1", vec![1, 2, 3])
1222 .unwrap();
1223
1224 storage.clear().unwrap();
1226
1227 assert!(!storage.has_ref("refs/heads/main").unwrap());
1229 assert_eq!(storage.object_count().unwrap(), 0);
1230 }
1231
1232 #[test]
1233 fn test_evict_if_needed_respects_configured_limit() {
1234 let (storage, _temp) = create_test_storage_with_limit(1_024);
1235
1236 storage
1237 .write_raw_object(ObjectType::Blob, &vec![b'a'; 900])
1238 .unwrap();
1239 storage
1240 .write_raw_object(ObjectType::Blob, &vec![b'b'; 900])
1241 .unwrap();
1242 storage
1243 .write_ref(
1244 "refs/heads/main",
1245 &Ref::Direct(
1246 ObjectId::from_hex("0123456789abcdef0123456789abcdef01234567").unwrap(),
1247 ),
1248 )
1249 .unwrap();
1250
1251 storage.build_tree().unwrap();
1252
1253 let before = local_total_bytes(&storage);
1254 assert!(before > 1_024);
1255
1256 let freed = storage.evict_if_needed().unwrap();
1257 assert!(freed > 0);
1258
1259 let after = local_total_bytes(&storage);
1260 assert!(after <= 1_024);
1261 }
1262}