1use flate2::read::ZlibDecoder;
13use flate2::write::ZlibEncoder;
14use flate2::Compression;
15use hashtree_config::{Config, StorageBackend};
16use hashtree_core::store::{Store, StoreError, StoreStats};
17use hashtree_core::types::Hash;
18use hashtree_core::{Cid, DirEntry, HashTree, HashTreeConfig, LinkType};
19use hashtree_fs::FsBlobStore;
20#[cfg(feature = "lmdb")]
21use hashtree_lmdb::LmdbBlobStore;
22use sha1::{Digest, Sha1};
23use std::collections::HashMap;
24use std::io::{Read, Write};
25use std::path::Path;
26use std::sync::Arc;
27use tokio::runtime::{Handle, Runtime};
28use tracing::{debug, info};
29
30use super::object::{parse_tree, GitObject, ObjectId, ObjectType};
31use super::refs::{validate_ref_name, Ref};
32use super::{Error, Result};
33
34type BoxFuture<'a, T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
36
37enum RuntimeExecutor {
39 Owned(Runtime),
40 Handle(Handle),
41}
42
43impl RuntimeExecutor {
44 fn block_on<F: std::future::Future>(&self, f: F) -> F::Output {
45 match self {
46 RuntimeExecutor::Owned(rt) => rt.block_on(f),
47 RuntimeExecutor::Handle(handle) => tokio::task::block_in_place(|| handle.block_on(f)),
48 }
49 }
50}
51
52pub enum LocalStore {
54 Fs(FsBlobStore),
55 #[cfg(feature = "lmdb")]
56 Lmdb(LmdbBlobStore),
57}
58
59impl LocalStore {
60 fn new_for_backend<P: AsRef<Path>>(
61 path: P,
62 backend: StorageBackend,
63 max_bytes: u64,
64 ) -> std::result::Result<Self, StoreError> {
65 match backend {
66 StorageBackend::Fs => {
67 if max_bytes > 0 {
68 Ok(LocalStore::Fs(FsBlobStore::with_max_bytes(
69 path, max_bytes,
70 )?))
71 } else {
72 Ok(LocalStore::Fs(FsBlobStore::new(path)?))
73 }
74 }
75 #[cfg(feature = "lmdb")]
76 StorageBackend::Lmdb => {
77 if max_bytes > 0 {
78 Ok(LocalStore::Lmdb(LmdbBlobStore::with_max_bytes(
79 path, max_bytes,
80 )?))
81 } else {
82 Ok(LocalStore::Lmdb(LmdbBlobStore::new(path)?))
83 }
84 }
85 #[cfg(not(feature = "lmdb"))]
86 StorageBackend::Lmdb => {
87 warn!(
88 "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
89 );
90 if max_bytes > 0 {
91 Ok(LocalStore::Fs(FsBlobStore::with_max_bytes(
92 path, max_bytes,
93 )?))
94 } else {
95 Ok(LocalStore::Fs(FsBlobStore::new(path)?))
96 }
97 }
98 }
99 }
100
101 pub fn new<P: AsRef<Path>>(path: P) -> std::result::Result<Self, StoreError> {
103 Self::new_with_max_bytes(path, 0)
104 }
105
106 pub fn new_with_max_bytes<P: AsRef<Path>>(
108 path: P,
109 max_bytes: u64,
110 ) -> std::result::Result<Self, StoreError> {
111 let config = Config::load_or_default();
112 Self::new_for_backend(path, config.storage.backend, max_bytes)
113 }
114
115 pub fn list(&self) -> std::result::Result<Vec<Hash>, StoreError> {
117 match self {
118 LocalStore::Fs(store) => store.list(),
119 #[cfg(feature = "lmdb")]
120 LocalStore::Lmdb(store) => store.list(),
121 }
122 }
123
124 pub fn get_sync(&self, hash: &Hash) -> std::result::Result<Option<Vec<u8>>, StoreError> {
126 match self {
127 LocalStore::Fs(store) => store.get_sync(hash),
128 #[cfg(feature = "lmdb")]
129 LocalStore::Lmdb(store) => store.get_sync(hash),
130 }
131 }
132}
133
134#[async_trait::async_trait]
135impl Store for LocalStore {
136 async fn put(&self, hash: Hash, data: Vec<u8>) -> std::result::Result<bool, StoreError> {
137 match self {
138 LocalStore::Fs(store) => store.put(hash, data).await,
139 #[cfg(feature = "lmdb")]
140 LocalStore::Lmdb(store) => store.put(hash, data).await,
141 }
142 }
143
144 async fn get(&self, hash: &Hash) -> std::result::Result<Option<Vec<u8>>, StoreError> {
145 match self {
146 LocalStore::Fs(store) => store.get(hash).await,
147 #[cfg(feature = "lmdb")]
148 LocalStore::Lmdb(store) => store.get(hash).await,
149 }
150 }
151
152 async fn has(&self, hash: &Hash) -> std::result::Result<bool, StoreError> {
153 match self {
154 LocalStore::Fs(store) => store.has(hash).await,
155 #[cfg(feature = "lmdb")]
156 LocalStore::Lmdb(store) => store.has(hash).await,
157 }
158 }
159
160 async fn delete(&self, hash: &Hash) -> std::result::Result<bool, StoreError> {
161 match self {
162 LocalStore::Fs(store) => store.delete(hash).await,
163 #[cfg(feature = "lmdb")]
164 LocalStore::Lmdb(store) => store.delete(hash).await,
165 }
166 }
167
168 fn set_max_bytes(&self, max: u64) {
169 match self {
170 LocalStore::Fs(store) => store.set_max_bytes(max),
171 #[cfg(feature = "lmdb")]
172 LocalStore::Lmdb(store) => store.set_max_bytes(max),
173 }
174 }
175
176 fn max_bytes(&self) -> Option<u64> {
177 match self {
178 LocalStore::Fs(store) => store.max_bytes(),
179 #[cfg(feature = "lmdb")]
180 LocalStore::Lmdb(store) => store.max_bytes(),
181 }
182 }
183
184 async fn stats(&self) -> StoreStats {
185 match self {
186 LocalStore::Fs(store) => match store.stats() {
187 Ok(stats) => StoreStats {
188 count: stats.count as u64,
189 bytes: stats.total_bytes,
190 pinned_count: stats.pinned_count as u64,
191 pinned_bytes: stats.pinned_bytes,
192 },
193 Err(_) => StoreStats::default(),
194 },
195 #[cfg(feature = "lmdb")]
196 LocalStore::Lmdb(store) => match store.stats() {
197 Ok(stats) => StoreStats {
198 count: stats.count as u64,
199 bytes: stats.total_bytes,
200 pinned_count: 0,
201 pinned_bytes: 0,
202 },
203 Err(_) => StoreStats::default(),
204 },
205 }
206 }
207
208 async fn evict_if_needed(&self) -> std::result::Result<u64, StoreError> {
209 match self {
210 LocalStore::Fs(store) => store.evict_if_needed().await,
211 #[cfg(feature = "lmdb")]
212 LocalStore::Lmdb(store) => store.evict_if_needed().await,
213 }
214 }
215}
216
217pub struct GitStorage {
219 store: Arc<LocalStore>,
220 tree: HashTree<LocalStore>,
221 runtime: RuntimeExecutor,
222 objects: std::sync::RwLock<HashMap<String, Vec<u8>>>,
224 refs: std::sync::RwLock<HashMap<String, String>>,
225 root_cid: std::sync::RwLock<Option<Cid>>,
227}
228
229impl GitStorage {
230 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
232 let config = Config::load_or_default();
233 let max_size_bytes = config
234 .storage
235 .max_size_gb
236 .saturating_mul(1024 * 1024 * 1024);
237 Self::open_with_max_bytes(path, max_size_bytes)
238 }
239
240 pub fn open_with_max_bytes(path: impl AsRef<Path>, max_size_bytes: u64) -> Result<Self> {
242 let config = Config::load_or_default();
243 Self::open_with_backend_and_max_bytes(path, config.storage.backend, max_size_bytes)
244 }
245
246 pub fn open_with_backend_and_max_bytes(
247 path: impl AsRef<Path>,
248 backend: StorageBackend,
249 max_size_bytes: u64,
250 ) -> Result<Self> {
251 let runtime = match Handle::try_current() {
252 Ok(handle) => RuntimeExecutor::Handle(handle),
253 Err(_) => {
254 let rt = Runtime::new()
255 .map_err(|e| Error::StorageError(format!("tokio runtime: {}", e)))?;
256 RuntimeExecutor::Owned(rt)
257 }
258 };
259
260 let store_path = path.as_ref().join("blobs");
261 let store = Arc::new(
262 LocalStore::new_for_backend(&store_path, backend, max_size_bytes)
263 .map_err(|e| Error::StorageError(format!("local store: {}", e)))?,
264 );
265
266 let tree = HashTree::new(HashTreeConfig::new(store.clone()));
268
269 Ok(Self {
270 store,
271 tree,
272 runtime,
273 objects: std::sync::RwLock::new(HashMap::new()),
274 refs: std::sync::RwLock::new(HashMap::new()),
275 root_cid: std::sync::RwLock::new(None),
276 })
277 }
278
279 pub fn evict_if_needed(&self) -> Result<u64> {
281 self.runtime
282 .block_on(self.store.evict_if_needed())
283 .map_err(|e| Error::StorageError(format!("evict: {}", e)))
284 }
285
286 fn write_object(&self, obj: &GitObject) -> Result<ObjectId> {
288 let oid = obj.id();
289 let key = oid.to_hex();
290
291 let loose = obj.to_loose_format();
292 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
293 encoder.write_all(&loose)?;
294 let compressed = encoder.finish()?;
295
296 let mut objects = self
297 .objects
298 .write()
299 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
300 objects.insert(key, compressed);
301
302 if let Ok(mut root) = self.root_cid.write() {
304 *root = None;
305 }
306
307 Ok(oid)
308 }
309
310 pub fn write_raw_object(&self, obj_type: ObjectType, content: &[u8]) -> Result<ObjectId> {
312 let obj = GitObject::new(obj_type, content.to_vec());
313 self.write_object(&obj)
314 }
315
316 #[allow(dead_code)]
318 fn read_object(&self, oid: &ObjectId) -> Result<GitObject> {
319 let key = oid.to_hex();
320 let objects = self
321 .objects
322 .read()
323 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
324 let compressed = objects
325 .get(&key)
326 .ok_or_else(|| Error::ObjectNotFound(key.clone()))?;
327
328 let mut decoder = ZlibDecoder::new(compressed.as_slice());
329 let mut data = Vec::new();
330 decoder.read_to_end(&mut data)?;
331
332 GitObject::from_loose_format(&data)
333 }
334
335 pub fn write_ref(&self, name: &str, target: &Ref) -> Result<()> {
337 validate_ref_name(name)?;
338
339 let value = match target {
340 Ref::Direct(oid) => oid.to_hex(),
341 Ref::Symbolic(target) => format!("ref: {}", target),
342 };
343
344 let mut refs = self
345 .refs
346 .write()
347 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
348 refs.insert(name.to_string(), value);
349
350 if let Ok(mut root) = self.root_cid.write() {
352 *root = None;
353 }
354
355 Ok(())
356 }
357
358 #[allow(dead_code)]
360 pub fn read_ref(&self, name: &str) -> Result<Option<Ref>> {
361 let refs = self
362 .refs
363 .read()
364 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
365
366 match refs.get(name) {
367 Some(value) => {
368 if let Some(target) = value.strip_prefix("ref: ") {
369 Ok(Some(Ref::Symbolic(target.to_string())))
370 } else {
371 let oid = ObjectId::from_hex(value)
372 .ok_or_else(|| Error::StorageError(format!("invalid ref: {}", value)))?;
373 Ok(Some(Ref::Direct(oid)))
374 }
375 }
376 None => Ok(None),
377 }
378 }
379
380 #[allow(dead_code)]
382 pub fn list_refs(&self) -> Result<HashMap<String, String>> {
383 let refs = self
384 .refs
385 .read()
386 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
387 Ok(refs.clone())
388 }
389
390 pub fn delete_ref(&self, name: &str) -> Result<bool> {
392 let mut refs = self
393 .refs
394 .write()
395 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
396 let existed = refs.remove(name).is_some();
397
398 if let Ok(mut root) = self.root_cid.write() {
400 *root = None;
401 }
402
403 Ok(existed)
404 }
405
406 pub fn import_compressed_object(&self, oid: &str, compressed_data: Vec<u8>) -> Result<()> {
409 let mut objects = self
410 .objects
411 .write()
412 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
413 objects.insert(oid.to_string(), compressed_data);
414
415 if let Ok(mut root) = self.root_cid.write() {
417 *root = None;
418 }
419
420 Ok(())
421 }
422
423 pub fn import_ref(&self, name: &str, value: &str) -> Result<()> {
425 let mut refs = self
426 .refs
427 .write()
428 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
429 refs.insert(name.to_string(), value.to_string());
430
431 if let Ok(mut root) = self.root_cid.write() {
433 *root = None;
434 }
435
436 Ok(())
437 }
438
439 #[cfg(test)]
441 pub fn has_ref(&self, name: &str) -> Result<bool> {
442 let refs = self
443 .refs
444 .read()
445 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
446 Ok(refs.contains_key(name))
447 }
448
449 #[cfg(test)]
451 pub fn object_count(&self) -> Result<usize> {
452 let objects = self
453 .objects
454 .read()
455 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
456 Ok(objects.len())
457 }
458
459 #[allow(dead_code)]
461 pub fn get_root_cid(&self) -> Result<Option<Cid>> {
462 let root = self
463 .root_cid
464 .read()
465 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
466 Ok(root.clone())
467 }
468
469 #[allow(dead_code)]
471 pub fn default_branch(&self) -> Result<Option<String>> {
472 let refs = self
473 .refs
474 .read()
475 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
476
477 if let Some(head) = refs.get("HEAD") {
478 if let Some(target) = head.strip_prefix("ref: ") {
479 return Ok(Some(target.to_string()));
480 }
481 }
482 Ok(None)
483 }
484
485 fn get_commit_tree(
487 &self,
488 commit_oid: &str,
489 objects: &HashMap<String, Vec<u8>>,
490 ) -> Option<String> {
491 let compressed = objects.get(commit_oid)?;
492
493 let mut decoder = ZlibDecoder::new(&compressed[..]);
495 let mut decompressed = Vec::new();
496 decoder.read_to_end(&mut decompressed).ok()?;
497
498 let null_pos = decompressed.iter().position(|&b| b == 0)?;
500 let content = &decompressed[null_pos + 1..];
501
502 let content_str = std::str::from_utf8(content).ok()?;
504 let first_line = content_str.lines().next()?;
505 first_line
506 .strip_prefix("tree ")
507 .map(|tree_hash| tree_hash.to_string())
508 }
509
510 fn get_object_content(
512 &self,
513 oid: &str,
514 objects: &HashMap<String, Vec<u8>>,
515 ) -> Option<(ObjectType, Vec<u8>)> {
516 let compressed = objects.get(oid)?;
517
518 let mut decoder = ZlibDecoder::new(&compressed[..]);
520 let mut decompressed = Vec::new();
521 decoder.read_to_end(&mut decompressed).ok()?;
522
523 let null_pos = decompressed.iter().position(|&b| b == 0)?;
525 let header = std::str::from_utf8(&decompressed[..null_pos]).ok()?;
526 let obj_type = if header.starts_with("blob") {
527 ObjectType::Blob
528 } else if header.starts_with("tree") {
529 ObjectType::Tree
530 } else if header.starts_with("commit") {
531 ObjectType::Commit
532 } else {
533 return None;
534 };
535 let content = decompressed[null_pos + 1..].to_vec();
536 Some((obj_type, content))
537 }
538
539 pub fn build_tree(&self) -> Result<Cid> {
541 if let Ok(root) = self.root_cid.read() {
543 if let Some(ref cid) = *root {
544 return Ok(cid.clone());
545 }
546 }
547
548 let objects = self
549 .objects
550 .read()
551 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
552 let refs = self
553 .refs
554 .read()
555 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
556
557 let (default_branch, commit_sha) = if let Some(head) = refs.get("HEAD") {
559 let branch = head.strip_prefix("ref: ").map(String::from);
560 let sha = branch.as_ref().and_then(|b| refs.get(b)).cloned();
561 (branch, sha)
562 } else {
563 let mut branch_info: Option<(String, String)> = None;
565 for (ref_name, sha) in refs.iter() {
566 if ref_name.starts_with("refs/heads/") {
567 branch_info = Some((ref_name.clone(), sha.clone()));
568 break;
569 }
570 }
571 match branch_info {
572 Some((branch, sha)) => (Some(branch), Some(sha)),
573 None => (None, None),
574 }
575 };
576
577 let tree_sha = commit_sha
579 .as_ref()
580 .and_then(|sha| self.get_commit_tree(sha, &objects));
581
582 let objects_clone = objects.clone();
584
585 let root_cid = self.runtime.block_on(async {
586 let objects_cid = self.build_objects_dir(&objects).await?;
588
589 let refs_cid = self.build_refs_dir(&refs).await?;
591
592 let head_content = refs.get("HEAD")
595 .map(|h| if h.ends_with('\n') { h.clone() } else { format!("{}\n", h) })
596 .or_else(|| default_branch.as_ref().map(|b| format!("ref: {}\n", b)))
597 .unwrap_or_else(|| "ref: refs/heads/main\n".to_string());
598 debug!("HEAD content: {:?}", head_content);
599 let (head_cid, head_size) = self.tree.put(head_content.as_bytes()).await
600 .map_err(|e| Error::StorageError(format!("put HEAD: {}", e)))?;
601 debug!("HEAD hash: {}", hex::encode(head_cid.hash));
602
603 let mut git_entries = vec![
605 DirEntry::from_cid("HEAD", &head_cid).with_size(head_size),
606 DirEntry::from_cid("objects", &objects_cid).with_link_type(LinkType::Dir),
607 DirEntry::from_cid("refs", &refs_cid).with_link_type(LinkType::Dir),
608 ];
609
610 if let Some(ref branch) = default_branch {
612 let config = format!(
613 "[core]\n\trepositoryformatversion = 0\n\tfilemode = true\n\tbare = true\n[init]\n\tdefaultBranch = {}\n",
614 branch.trim_start_matches("refs/heads/")
615 );
616 let (config_cid, config_size) = self.tree.put(config.as_bytes()).await
617 .map_err(|e| Error::StorageError(format!("put config: {}", e)))?;
618 git_entries.push(DirEntry::from_cid("config", &config_cid).with_size(config_size));
619 }
620
621 if let Some(ref tree_oid) = tree_sha {
623 match self.build_index_file(tree_oid, &objects_clone) {
624 Ok(index_data) => {
625 let (index_cid, index_size) = self.tree.put(&index_data).await
626 .map_err(|e| Error::StorageError(format!("put index: {}", e)))?;
627 git_entries.push(DirEntry::from_cid("index", &index_cid).with_size(index_size));
628 info!("Added git index file ({} bytes)", index_data.len());
629 }
630 Err(e) => {
631 debug!("Failed to build git index file: {} - continuing without index", e);
632 }
633 }
634 }
635
636 let git_cid = self.tree.put_directory(git_entries).await
637 .map_err(|e| Error::StorageError(format!("put .git: {}", e)))?;
638
639 let mut root_entries = vec![DirEntry::from_cid(".git", &git_cid).with_link_type(LinkType::Dir)];
642
643 if let Some(ref tree_oid) = tree_sha {
645 let working_tree_entries = self.build_working_tree_entries(tree_oid, &objects_clone).await?;
646 root_entries.extend(working_tree_entries);
647 info!("Added {} working tree entries to root", root_entries.len() - 1);
648 }
649
650 root_entries.sort_by(|a, b| a.name.cmp(&b.name));
652
653 let root_cid = self.tree.put_directory(root_entries).await
654 .map_err(|e| Error::StorageError(format!("put root: {}", e)))?;
655
656 info!("Built hashtree root: {} (encrypted: {}) (.git dir: {})",
657 hex::encode(root_cid.hash),
658 root_cid.key.is_some(),
659 hex::encode(git_cid.hash));
660
661 Ok::<Cid, Error>(root_cid)
662 })?;
663
664 if let Ok(mut root) = self.root_cid.write() {
666 *root = Some(root_cid.clone());
667 }
668
669 Ok(root_cid)
670 }
671
672 async fn build_working_tree_entries(
674 &self,
675 tree_oid: &str,
676 objects: &HashMap<String, Vec<u8>>,
677 ) -> Result<Vec<DirEntry>> {
678 let mut entries = Vec::new();
679
680 let (obj_type, content) = self
682 .get_object_content(tree_oid, objects)
683 .ok_or_else(|| Error::ObjectNotFound(tree_oid.to_string()))?;
684
685 if obj_type != ObjectType::Tree {
686 return Err(Error::InvalidObjectType(format!(
687 "expected tree, got {:?}",
688 obj_type
689 )));
690 }
691
692 let tree_entries = parse_tree(&content)?;
694
695 for entry in tree_entries {
696 let oid_hex = entry.oid.to_hex();
697
698 if entry.is_tree() {
699 let sub_entries = self
701 .build_working_tree_entries_boxed(&oid_hex, objects)
702 .await?;
703
704 let dir_cid =
706 self.tree.put_directory(sub_entries).await.map_err(|e| {
707 Error::StorageError(format!("put dir {}: {}", entry.name, e))
708 })?;
709
710 entries
712 .push(DirEntry::from_cid(&entry.name, &dir_cid).with_link_type(LinkType::Dir));
713 } else {
714 if let Some((ObjectType::Blob, blob_content)) =
716 self.get_object_content(&oid_hex, objects)
717 {
718 let (cid, size) = self.tree.put(&blob_content).await.map_err(|e| {
720 Error::StorageError(format!("put blob {}: {}", entry.name, e))
721 })?;
722
723 entries.push(DirEntry::from_cid(&entry.name, &cid).with_size(size));
725 }
726 }
727 }
728
729 entries.sort_by(|a, b| a.name.cmp(&b.name));
731
732 Ok(entries)
733 }
734
735 fn build_working_tree_entries_boxed<'a>(
737 &'a self,
738 tree_oid: &'a str,
739 objects: &'a HashMap<String, Vec<u8>>,
740 ) -> BoxFuture<'a, Result<Vec<DirEntry>>> {
741 Box::pin(self.build_working_tree_entries(tree_oid, objects))
742 }
743
744 async fn build_objects_dir(&self, objects: &HashMap<String, Vec<u8>>) -> Result<Cid> {
746 if objects.is_empty() {
747 let empty_cid = self
749 .tree
750 .put_directory(vec![])
751 .await
752 .map_err(|e| Error::StorageError(format!("put empty objects: {}", e)))?;
753 return Ok(empty_cid);
754 }
755
756 let mut buckets: HashMap<String, Vec<(String, Vec<u8>)>> = HashMap::new();
759 for (oid, data) in objects {
760 let prefix = &oid[..2];
761 let suffix = &oid[2..];
762 buckets
763 .entry(prefix.to_string())
764 .or_default()
765 .push((suffix.to_string(), data.clone()));
766 }
767
768 let mut top_entries = Vec::new();
770 for (prefix, objs) in buckets {
771 let mut sub_entries = Vec::new();
772 for (suffix, data) in objs {
773 let (cid, size) = self.tree.put(&data).await.map_err(|e| {
776 Error::StorageError(format!("put object {}{}: {}", prefix, suffix, e))
777 })?;
778 sub_entries.push(DirEntry::from_cid(suffix, &cid).with_size(size));
780 }
781 sub_entries.sort_by(|a, b| a.name.cmp(&b.name));
783
784 let sub_cid = self
785 .tree
786 .put_directory(sub_entries)
787 .await
788 .map_err(|e| Error::StorageError(format!("put objects/{}: {}", prefix, e)))?;
789 top_entries.push(DirEntry::from_cid(prefix, &sub_cid).with_link_type(LinkType::Dir));
790 }
791
792 top_entries.sort_by(|a, b| a.name.cmp(&b.name));
794
795 let bucket_count = top_entries.len();
796 let cid = self
797 .tree
798 .put_directory(top_entries)
799 .await
800 .map_err(|e| Error::StorageError(format!("put objects dir: {}", e)))?;
801
802 debug!(
803 "Built objects dir with {} buckets: {}",
804 bucket_count,
805 hex::encode(cid.hash)
806 );
807 Ok(cid)
808 }
809
810 async fn build_refs_dir(&self, refs: &HashMap<String, String>) -> Result<Cid> {
812 let mut groups: HashMap<String, Vec<(String, String)>> = HashMap::new();
814
815 for (ref_name, value) in refs {
816 let parts: Vec<&str> = ref_name.split('/').collect();
817 if parts.len() >= 3 && parts[0] == "refs" {
818 let category = parts[1].to_string();
819 let name = parts[2..].join("/");
820 groups
821 .entry(category)
822 .or_default()
823 .push((name, value.clone()));
824 }
825 }
826
827 let mut ref_entries = Vec::new();
828
829 for (category, refs_in_category) in groups {
830 let mut cat_entries = Vec::new();
831 for (name, value) in refs_in_category {
832 let (cid, _size) = self
834 .tree
835 .put(value.as_bytes())
836 .await
837 .map_err(|e| Error::StorageError(format!("put ref: {}", e)))?;
838 debug!(
839 "refs/{}/{} -> blob {}",
840 category,
841 name,
842 hex::encode(cid.hash)
843 );
844 cat_entries.push(DirEntry::from_cid(name, &cid));
845 }
846
847 cat_entries.sort_by(|a, b| a.name.cmp(&b.name));
848
849 let cat_cid = self
850 .tree
851 .put_directory(cat_entries)
852 .await
853 .map_err(|e| Error::StorageError(format!("put {} dir: {}", category, e)))?;
854 debug!("refs/{} dir -> {}", category, hex::encode(cat_cid.hash));
855 ref_entries.push(DirEntry::from_cid(category, &cat_cid).with_link_type(LinkType::Dir));
856 }
857
858 if ref_entries.is_empty() {
859 let empty_cid = self
861 .tree
862 .put_directory(vec![])
863 .await
864 .map_err(|e| Error::StorageError(format!("put empty refs: {}", e)))?;
865 return Ok(empty_cid);
866 }
867
868 ref_entries.sort_by(|a, b| a.name.cmp(&b.name));
869
870 let refs_cid = self
871 .tree
872 .put_directory(ref_entries)
873 .await
874 .map_err(|e| Error::StorageError(format!("put refs dir: {}", e)))?;
875 debug!("refs dir -> {}", hex::encode(refs_cid.hash));
876 Ok(refs_cid)
877 }
878
879 fn build_index_file(
882 &self,
883 tree_oid: &str,
884 objects: &HashMap<String, Vec<u8>>,
885 ) -> Result<Vec<u8>> {
886 let mut entries: Vec<(String, [u8; 20], u32, u32)> = Vec::new(); self.collect_tree_entries_for_index(tree_oid, objects, "", &mut entries)?;
889
890 entries.sort_by(|a, b| a.0.cmp(&b.0));
892
893 let entry_count = entries.len() as u32;
894 debug!("Building git index with {} entries", entry_count);
895
896 let mut index_data = Vec::new();
898
899 index_data.extend_from_slice(b"DIRC");
901 index_data.extend_from_slice(&2u32.to_be_bytes()); index_data.extend_from_slice(&entry_count.to_be_bytes());
903
904 let now_sec = std::time::SystemTime::now()
906 .duration_since(std::time::UNIX_EPOCH)
907 .unwrap_or_default()
908 .as_secs() as u32;
909
910 for (path, sha1, mode, size) in &entries {
911 let entry_start = index_data.len();
912
913 index_data.extend_from_slice(&now_sec.to_be_bytes());
915 index_data.extend_from_slice(&0u32.to_be_bytes());
916 index_data.extend_from_slice(&now_sec.to_be_bytes());
918 index_data.extend_from_slice(&0u32.to_be_bytes());
919 index_data.extend_from_slice(&0u32.to_be_bytes());
921 index_data.extend_from_slice(&0u32.to_be_bytes());
922 index_data.extend_from_slice(&mode.to_be_bytes());
924 index_data.extend_from_slice(&0u32.to_be_bytes());
926 index_data.extend_from_slice(&0u32.to_be_bytes());
927 index_data.extend_from_slice(&size.to_be_bytes());
929 index_data.extend_from_slice(sha1);
931 let path_len = std::cmp::min(path.len(), 0xFFF) as u16;
933 index_data.extend_from_slice(&path_len.to_be_bytes());
934 index_data.extend_from_slice(path.as_bytes());
936 index_data.push(0); let entry_len = index_data.len() - entry_start;
940 let padding = (8 - (entry_len % 8)) % 8;
941 index_data.extend(std::iter::repeat_n(0, padding));
942 }
943
944 let mut hasher = Sha1::new();
946 hasher.update(&index_data);
947 let checksum = hasher.finalize();
948 index_data.extend_from_slice(&checksum);
949
950 debug!(
951 "Built git index: {} bytes, {} entries",
952 index_data.len(),
953 entry_count
954 );
955 Ok(index_data)
956 }
957
958 fn collect_tree_entries_for_index(
960 &self,
961 tree_oid: &str,
962 objects: &HashMap<String, Vec<u8>>,
963 prefix: &str,
964 entries: &mut Vec<(String, [u8; 20], u32, u32)>,
965 ) -> Result<()> {
966 let (obj_type, content) = self
967 .get_object_content(tree_oid, objects)
968 .ok_or_else(|| Error::ObjectNotFound(tree_oid.to_string()))?;
969
970 if obj_type != ObjectType::Tree {
971 return Err(Error::InvalidObjectType(format!(
972 "expected tree, got {:?}",
973 obj_type
974 )));
975 }
976
977 let tree_entries = parse_tree(&content)?;
978
979 for entry in tree_entries {
980 let path = if prefix.is_empty() {
981 entry.name.clone()
982 } else {
983 format!("{}/{}", prefix, entry.name)
984 };
985
986 let oid_hex = entry.oid.to_hex();
987
988 if entry.is_tree() {
989 self.collect_tree_entries_for_index(&oid_hex, objects, &path, entries)?;
991 } else {
992 if let Some((ObjectType::Blob, blob_content)) =
994 self.get_object_content(&oid_hex, objects)
995 {
996 let mut sha1_bytes = [0u8; 20];
998 if let Ok(bytes) = hex::decode(&oid_hex) {
999 if bytes.len() == 20 {
1000 sha1_bytes.copy_from_slice(&bytes);
1001 }
1002 }
1003
1004 let mode = entry.mode;
1006 let size = blob_content.len() as u32;
1007
1008 entries.push((path, sha1_bytes, mode, size));
1009 }
1010 }
1011 }
1012
1013 Ok(())
1014 }
1015
1016 pub fn store(&self) -> &Arc<LocalStore> {
1018 &self.store
1019 }
1020
1021 #[allow(dead_code)]
1023 pub fn hashtree(&self) -> &HashTree<LocalStore> {
1024 &self.tree
1025 }
1026
1027 #[allow(dead_code)]
1029 pub fn push_to_file_servers(
1030 &self,
1031 blossom: &hashtree_blossom::BlossomClient,
1032 ) -> Result<(usize, usize)> {
1033 let hashes = self
1034 .store
1035 .list()
1036 .map_err(|e| Error::StorageError(format!("list hashes: {}", e)))?;
1037
1038 info!("Pushing {} blobs to file servers", hashes.len());
1039
1040 let mut uploaded = 0;
1041 let mut existed = 0;
1042
1043 self.runtime.block_on(async {
1044 for hash in &hashes {
1045 let hex_hash = hex::encode(hash);
1046 let data = match self.store.get_sync(hash) {
1047 Ok(Some(d)) => d,
1048 _ => continue,
1049 };
1050
1051 match blossom.upload_if_missing(&data).await {
1052 Ok((_, true)) => {
1053 debug!("Uploaded {}", &hex_hash[..12]);
1054 uploaded += 1;
1055 }
1056 Ok((_, false)) => {
1057 existed += 1;
1058 }
1059 Err(e) => {
1060 debug!("Failed to upload {}: {}", &hex_hash[..12], e);
1061 }
1062 }
1063 }
1064 });
1065
1066 info!(
1067 "Upload complete: {} new, {} already existed",
1068 uploaded, existed
1069 );
1070 Ok((uploaded, existed))
1071 }
1072
1073 #[allow(dead_code)]
1075 pub fn clear(&self) -> Result<()> {
1076 let mut objects = self
1077 .objects
1078 .write()
1079 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
1080 let mut refs = self
1081 .refs
1082 .write()
1083 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
1084 let mut root = self
1085 .root_cid
1086 .write()
1087 .map_err(|e| Error::StorageError(format!("lock: {}", e)))?;
1088
1089 objects.clear();
1090 refs.clear();
1091 *root = None;
1092 Ok(())
1093 }
1094}
1095
1096#[cfg(test)]
1097mod tests {
1098 use super::*;
1099 use tempfile::TempDir;
1100
1101 fn create_test_storage() -> (GitStorage, TempDir) {
1102 let temp_dir = TempDir::new().unwrap();
1103 let storage = GitStorage::open(temp_dir.path()).unwrap();
1104 (storage, temp_dir)
1105 }
1106
1107 fn create_test_storage_with_limit(max_size_bytes: u64) -> (GitStorage, TempDir) {
1108 let temp_dir = TempDir::new().unwrap();
1109 let storage = GitStorage::open_with_backend_and_max_bytes(
1110 temp_dir.path(),
1111 StorageBackend::Fs,
1112 max_size_bytes,
1113 )
1114 .unwrap();
1115 (storage, temp_dir)
1116 }
1117
1118 fn local_total_bytes(storage: &GitStorage) -> u64 {
1119 match storage.store().as_ref() {
1120 LocalStore::Fs(store) => store.stats().unwrap().total_bytes,
1121 #[cfg(feature = "lmdb")]
1122 LocalStore::Lmdb(store) => store.stats().unwrap().total_bytes,
1123 }
1124 }
1125
1126 #[test]
1127 fn test_import_ref() {
1128 let (storage, _temp) = create_test_storage();
1129
1130 storage
1132 .import_ref("refs/heads/main", "abc123def456")
1133 .unwrap();
1134
1135 assert!(storage.has_ref("refs/heads/main").unwrap());
1137
1138 let refs = storage.list_refs().unwrap();
1140 assert_eq!(
1141 refs.get("refs/heads/main"),
1142 Some(&"abc123def456".to_string())
1143 );
1144 }
1145
1146 #[test]
1147 fn test_import_multiple_refs_preserves_all() {
1148 let (storage, _temp) = create_test_storage();
1149
1150 storage.import_ref("refs/heads/main", "sha_main").unwrap();
1152 storage.import_ref("refs/heads/dev", "sha_dev").unwrap();
1153 storage
1154 .import_ref("refs/heads/feature", "sha_feature")
1155 .unwrap();
1156
1157 assert!(storage.has_ref("refs/heads/main").unwrap());
1159 assert!(storage.has_ref("refs/heads/dev").unwrap());
1160 assert!(storage.has_ref("refs/heads/feature").unwrap());
1161
1162 storage
1164 .write_ref(
1165 "refs/heads/new-branch",
1166 &Ref::Direct(
1167 ObjectId::from_hex("0123456789abcdef0123456789abcdef01234567").unwrap(),
1168 ),
1169 )
1170 .unwrap();
1171
1172 let refs = storage.list_refs().unwrap();
1174 assert_eq!(refs.len(), 4);
1175 assert!(refs.contains_key("refs/heads/main"));
1176 assert!(refs.contains_key("refs/heads/dev"));
1177 assert!(refs.contains_key("refs/heads/feature"));
1178 assert!(refs.contains_key("refs/heads/new-branch"));
1179 }
1180
1181 #[test]
1182 fn test_import_compressed_object() {
1183 let (storage, _temp) = create_test_storage();
1184
1185 let fake_compressed = vec![0x78, 0x9c, 0x01, 0x02, 0x03]; storage
1189 .import_compressed_object("abc123def456", fake_compressed.clone())
1190 .unwrap();
1191
1192 assert_eq!(storage.object_count().unwrap(), 1);
1194 }
1195
1196 #[test]
1197 fn test_write_ref_overwrites_imported() {
1198 let (storage, _temp) = create_test_storage();
1199
1200 storage.import_ref("refs/heads/main", "old_sha").unwrap();
1202
1203 storage
1205 .write_ref(
1206 "refs/heads/main",
1207 &Ref::Direct(
1208 ObjectId::from_hex("0123456789abcdef0123456789abcdef01234567").unwrap(),
1209 ),
1210 )
1211 .unwrap();
1212
1213 let refs = storage.list_refs().unwrap();
1215 assert_eq!(
1216 refs.get("refs/heads/main"),
1217 Some(&"0123456789abcdef0123456789abcdef01234567".to_string())
1218 );
1219 }
1220
1221 #[test]
1222 fn test_delete_ref_preserves_others() {
1223 let (storage, _temp) = create_test_storage();
1224
1225 storage.import_ref("refs/heads/main", "sha_main").unwrap();
1227 storage.import_ref("refs/heads/dev", "sha_dev").unwrap();
1228
1229 storage.delete_ref("refs/heads/dev").unwrap();
1231
1232 assert!(storage.has_ref("refs/heads/main").unwrap());
1234 assert!(!storage.has_ref("refs/heads/dev").unwrap());
1235 }
1236
1237 #[test]
1238 fn test_clear_removes_all() {
1239 let (storage, _temp) = create_test_storage();
1240
1241 storage.import_ref("refs/heads/main", "sha_main").unwrap();
1243 storage
1244 .import_compressed_object("obj1", vec![1, 2, 3])
1245 .unwrap();
1246
1247 storage.clear().unwrap();
1249
1250 assert!(!storage.has_ref("refs/heads/main").unwrap());
1252 assert_eq!(storage.object_count().unwrap(), 0);
1253 }
1254
1255 #[test]
1256 fn test_evict_if_needed_respects_configured_limit() {
1257 let (storage, _temp) = create_test_storage_with_limit(1_024);
1258
1259 storage
1260 .write_raw_object(ObjectType::Blob, &vec![b'a'; 900])
1261 .unwrap();
1262 storage
1263 .write_raw_object(ObjectType::Blob, &vec![b'b'; 900])
1264 .unwrap();
1265 storage
1266 .write_ref(
1267 "refs/heads/main",
1268 &Ref::Direct(
1269 ObjectId::from_hex("0123456789abcdef0123456789abcdef01234567").unwrap(),
1270 ),
1271 )
1272 .unwrap();
1273
1274 storage.build_tree().unwrap();
1275
1276 let before = local_total_bytes(&storage);
1277 assert!(before > 1_024);
1278
1279 let freed = storage.evict_if_needed().unwrap();
1280 assert!(freed > 0);
1281
1282 let after = local_total_bytes(&storage);
1283 assert!(after <= 1_024);
1284 }
1285}