1pub mod branch;
2pub mod chunker;
3pub mod commit;
4pub mod compression;
5pub mod index;
6pub mod manifest;
7pub mod store;
8pub mod wal;
9
10use crate::commit::Commit;
11use crate::compression::Compression;
12use crate::index::Index;
13use crate::manifest::FileManifest;
14use crate::store::Store;
15use anyhow::Result;
16use ed25519_dalek::{Signer, Verifier};
17use serde::Serialize;
18use shard_crypto::KeyPair;
19use shard_net::libp2p::futures::StreamExt;
20use std::collections::HashMap;
21use std::fs;
22use std::io::Write;
23use std::path::Path;
24use std::time::{Duration, SystemTime, UNIX_EPOCH};
25use tracing::{error, info};
26
27pub fn init(
28 path: &Path,
29 backend: &str,
30 compression_algo: &str,
31 chunker_mode: &str,
32 chunk_size: Option<u64>,
33) -> Result<()> {
34 let shard_dir = path.join(".shard");
35 if shard_dir.exists() {
36 anyhow::bail!(
37 "repository already initialized at {} (run `shard status` to confirm)",
38 shard_dir.display()
39 );
40 }
41 fs::create_dir_all(shard_dir.join("objects"))?;
42 fs::create_dir_all(shard_dir.join("keys"))?;
43 fs::create_dir_all(shard_dir.join("refs").join("heads"))?;
44 branch::set_head_branch(&shard_dir, "main")?;
45
46 let keys = KeyPair::generate();
47 keys.save(&shard_dir.join("keys"))?;
48
49 let pubkey = fs::read(shard_dir.join("keys/public.key"))?;
52 let repo_id = blake3::hash(&pubkey).to_hex().to_string();
53 let mut config = load_config(&shard_dir)?;
54 config.insert("repo_id".to_string(), repo_id);
55 config.insert("storage_backend".to_string(), backend.to_string());
56 config.insert("compression".to_string(), compression_algo.to_string());
57 config.insert("chunker_mode".to_string(), chunker_mode.to_string());
58 match chunker_mode {
59 "rabin" => {
60 let chunk_size = chunk_size.unwrap_or(4_194_304);
61 let min = chunk_size / 4;
62 let max = chunk_size * 2;
63 config.insert("chunk_min".to_string(), min.to_string());
64 config.insert("chunk_avg".to_string(), chunk_size.to_string());
65 config.insert("chunk_max".to_string(), max.to_string());
66 }
67 _ => {
68 let cs = chunk_size.unwrap_or(4_194_304);
69 config.insert("chunk_size".to_string(), cs.to_string());
70 }
71 }
72 save_config(&shard_dir, &config)?;
73
74 let chunker_desc = if chunker_mode == "rabin" {
75 format!(
76 "rabin (avg {} bytes)",
77 config.get("chunk_avg").unwrap_or(&"4 MiB".to_string())
78 )
79 } else {
80 format!(
81 "fixed ({} bytes)",
82 config.get("chunk_size").unwrap_or(&"4 MiB".to_string())
83 )
84 };
85 info!(
86 "Initialized empty Shard repository in {} with {} storage (compression: {}, chunking: {})",
87 shard_dir.display(),
88 backend,
89 compression_algo,
90 chunker_desc,
91 );
92 Ok(())
93}
94
95fn relative_path(repo_root: &Path, file_path: &Path) -> String {
96 let repo = repo_root
97 .canonicalize()
98 .unwrap_or_else(|_| repo_root.to_path_buf());
99 let file = file_path
100 .canonicalize()
101 .unwrap_or_else(|_| file_path.to_path_buf());
102 file.strip_prefix(&repo)
103 .map(|p| p.to_string_lossy().to_string())
104 .unwrap_or_else(|_| {
105 file_path
106 .file_name()
107 .map(|s| s.to_string_lossy().to_string())
108 .unwrap_or_default()
109 })
110}
111
112fn add_file(
113 repo_root: &Path,
114 file_path: &Path,
115 store: &Store,
116 index: &mut Index,
117 compression: &Compression,
118 chunker_mode: &chunker::ChunkerMode,
119) -> Result<()> {
120 let file = fs::File::open(file_path)?;
121 let mut chunker = match chunker_mode {
122 chunker::ChunkerMode::Fixed { chunk_size } => {
123 chunker::Chunker::new_fixed(Box::new(file), *chunk_size)
124 }
125 chunker::ChunkerMode::Rabin { min, avg, max } => {
126 chunker::Chunker::new_rabin(Box::new(file), *min, *avg, *max)
127 }
128 };
129 let mut chunk_hashes = Vec::new();
130 let mut total_size = 0;
131
132 while let Some(chunk) = chunker.next_chunk()? {
133 let hash = chunk.hash;
134 let compressed_data = compression.compress(&chunk.data)?;
135 let stored = crate::chunker::Chunk {
136 hash,
137 data: compressed_data,
138 offset: chunk.offset,
139 };
140 store.put_chunk(&stored)?;
141 chunk_hashes.push(hash.to_hex().to_string());
142 total_size += chunk.data.len() as u64;
143 }
144
145 let name = relative_path(repo_root, file_path);
146 let manifest = FileManifest {
147 name: name.clone(),
148 size: total_size,
149 chunks: chunk_hashes,
150 content_type: None,
151 compression: compression.as_str().to_string(),
152 };
153
154 index.files.insert(name.clone(), manifest);
155 info!("Added {} ({})", name, total_size);
156 Ok(())
157}
158
159pub fn recover(path: &Path) -> Result<()> {
160 let shard_dir = path.join(".shard");
161 if !shard_dir.exists() {
162 anyhow::bail!("not a shard repository (run `shard init` first)");
163 }
164 wal::recover(&shard_dir)?;
165 info!("Recovery complete.");
166 Ok(())
167}
168
169pub fn add(path: &Path, file_path: &Path) -> Result<()> {
170 let shard_dir = path.join(".shard");
171 if !shard_dir.exists() {
172 anyhow::bail!("not a shard repository (run `shard init` first)");
173 }
174
175 wal::recover(&shard_dir)?;
176
177 let config = load_config(&shard_dir)?;
178 let compression: Compression = config
179 .get("compression")
180 .map(|s| s.as_str())
181 .unwrap_or("zstd")
182 .parse()?;
183
184 let chunker_mode = chunker::ChunkerMode::from_config(&config);
185
186 let store = Store::open(&shard_dir)?;
187 let mut index = Index::load(&shard_dir.join("index"))?;
188
189 if file_path.is_dir() {
190 for entry in walkdir::WalkDir::new(file_path)
191 .into_iter()
192 .filter_entry(|e| {
193 e.file_name()
194 .to_str()
195 .map(|s| !s.starts_with('.'))
196 .unwrap_or(false)
197 })
198 {
199 let entry = entry?;
200 if entry.file_type().is_file() {
201 add_file(
202 path,
203 entry.path(),
204 &store,
205 &mut index,
206 &compression,
207 &chunker_mode,
208 )?;
209 }
210 }
211 } else {
212 add_file(
213 path,
214 file_path,
215 &store,
216 &mut index,
217 &compression,
218 &chunker_mode,
219 )?;
220 }
221
222 index.save(&shard_dir.join("index"))?;
223 Ok(())
224}
225
226pub fn commit(path: &Path, message: &str, author: &str) -> Result<()> {
227 let shard_dir = path.join(".shard");
228 if !shard_dir.exists() {
229 anyhow::bail!("not a shard repository (run `shard init` first)");
230 }
231
232 wal::recover(&shard_dir)?;
234
235 let store = Store::open(&shard_dir)?;
236 let mut index = Index::load(&shard_dir.join("index"))?;
237
238 if index.files.is_empty() {
239 anyhow::bail!("nothing to commit (stage files with `shard add` first)");
240 }
241
242 let head_path = shard_dir.join("HEAD");
243
244 let wal = wal::Wal::new(&shard_dir);
246 let head_backup = fs::read_to_string(&head_path).ok();
247 let index_backup = fs::read(shard_dir.join("index"))?;
248 wal.append(&wal::WalEntry::CommitBegin {
249 head_backup,
250 index_backup,
251 })?;
252
253 let mut manifest_ids = Vec::new();
255 for manifest in index.files.values() {
256 let json = serde_json::to_vec(manifest)?;
257 let hash = blake3::hash(&json);
258 let chunk = crate::chunker::Chunk {
259 hash,
260 data: json,
261 offset: 0,
262 };
263 store.put_chunk(&chunk)?;
264 manifest_ids.push(hash.to_hex().to_string());
265 }
266 manifest_ids.sort();
267
268 let mut parents = Vec::new();
270 let (current_branch, parent_id) = branch::resolve_head(&shard_dir)?;
271 if let Some(pid) = parent_id {
272 parents.push(pid);
273 }
274
275 let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
277 let keys = KeyPair::load(&shard_dir.join("keys"))?;
278 let public_key_hex = hex::encode(keys.verifying_key.to_bytes());
279 let mut commit = Commit {
280 commit_id: String::new(),
281 parents,
282 manifests: manifest_ids,
283 author: author.to_string(),
284 message: message.to_string(),
285 timestamp,
286 public_key: Some(public_key_hex),
287 signature: None,
288 };
289
290 let signing_key = keys.signing_key;
292 let json_unsigned = serde_json::to_vec(&commit)?;
293 let signature = signing_key.sign(&json_unsigned);
294 commit.signature = Some(hex::encode(signature.to_bytes()));
295
296 let json_final = serde_json::to_vec(&commit)?;
298 let hash = blake3::hash(&json_final);
299 let chunk = crate::chunker::Chunk {
300 hash,
301 data: json_final,
302 offset: 0,
303 };
304 store.put_chunk(&chunk)?;
305
306 let commit_id = hash.to_hex().to_string();
308 if let Some(ref branch_name) = current_branch {
309 branch::update_branch_ref(&shard_dir, branch_name, &commit_id)?;
310 branch::set_head_branch(&shard_dir, branch_name)?;
311 } else {
312 fs::write(&head_path, &commit_id)?;
313 }
314
315 index.files.clear();
317 index.save(&shard_dir.join("index"))?;
318
319 wal.append(&wal::WalEntry::CommitEnd)?;
321 wal.truncate()?;
322
323 info!("Committed {} ({})", commit_id, message);
324 Ok(())
325}
326
327pub fn verify(path: &Path, commit_id: &str, json: bool) -> Result<()> {
328 let shard_dir = path.join(".shard");
329 if !shard_dir.exists() {
330 anyhow::bail!("not a shard repository (run `shard init` first)");
331 }
332
333 if commit_id.len() < 2 {
334 anyhow::bail!("invalid commit id (too short: need at least 2 characters)");
335 }
336 let store = Store::open(&shard_dir)?;
337 let commit_data = store.get_chunk(commit_id)?;
338 let commit: Commit = serde_json::from_slice(&commit_data)?;
339
340 let mut sig_verified = false;
341 let mut files_checked = 0u64;
342
343 if let Some(sig_hex) = &commit.signature {
344 let verifying_key = if let Some(pk_hex) = &commit.public_key {
345 let pk_bytes = hex::decode(pk_hex)?;
346 ed25519_dalek::VerifyingKey::from_bytes(pk_bytes.as_slice().try_into()?)?
347 } else {
348 let pub_key_path = shard_dir.join("keys/public.key");
349 let pub_bytes = fs::read(pub_key_path)?;
350 ed25519_dalek::VerifyingKey::from_bytes(pub_bytes.as_slice().try_into()?)?
351 };
352
353 let mut unsigned_commit = commit.clone();
354 unsigned_commit.signature = None;
355 let json_unsigned = serde_json::to_vec(&unsigned_commit)?;
356
357 let sig_bytes = hex::decode(sig_hex)?;
358 let signature = ed25519_dalek::Signature::from_bytes(sig_bytes.as_slice().try_into()?);
359
360 verifying_key.verify(&json_unsigned, &signature)?;
361 sig_verified = true;
362 if !json {
363 info!("Signature verified.");
364 }
365 } else if !json {
366 info!("Warning: Commit is unsigned.");
367 }
368
369 for manifest_id in &commit.manifests {
370 let manifest_data = store.get_chunk(manifest_id)?;
371 let hash = blake3::hash(&manifest_data);
372 if hash.to_hex().to_string() != *manifest_id {
373 anyhow::bail!("manifest object hash mismatch for manifest '{}': content does not match stored hash. The object store may be corrupted.", manifest_id);
374 }
375
376 let manifest: FileManifest = serde_json::from_slice(&manifest_data)?;
377 let compression = manifest.compression.parse::<Compression>()?;
378 if !json {
379 info!(
380 "Verifying file: {} (compression: {})",
381 manifest.name, manifest.compression
382 );
383 }
384
385 for chunk_id in &manifest.chunks {
386 let chunk_data = store.get_chunk(chunk_id)?;
387 let decompressed = compression.decompress(&chunk_data)?;
388 let hash = blake3::hash(&decompressed);
389 if hash.to_hex().to_string() != *chunk_id {
390 anyhow::bail!("chunk hash mismatch for '{}': content does not match stored hash (expected {}, got {}). File may be corrupted.", manifest.name, chunk_id, hash.to_hex());
391 }
392 }
393 files_checked += 1;
394 }
395
396 if json {
397 info!(
398 "{}",
399 serde_json::to_string(&serde_json::json!({
400 "commit_id": commit_id,
401 "verified": true,
402 "signature_verified": sig_verified,
403 "files_checked": files_checked,
404 }))?
405 );
406 } else {
407 info!("Verification successful.");
408 }
409 Ok(())
410}
411
412fn load_commit(store: &Store, commit_id: &str) -> Result<Commit> {
413 if commit_id.len() < 2 {
414 anyhow::bail!(
415 "commit id too short (got {} chars, need at least 2): '{}'",
416 commit_id.len(),
417 commit_id
418 );
419 };
420 let data = store.get_chunk(commit_id)?;
421 let mut commit: Commit = serde_json::from_slice(&data)?;
422 commit.commit_id = commit_id.to_string();
423 Ok(commit)
424}
425
426#[derive(Serialize)]
427pub struct LogEntry {
428 pub commit_id: String,
429 pub parents: Vec<String>,
430 pub manifests: Vec<String>,
431 pub author: String,
432 pub message: String,
433 pub timestamp: u64,
434 pub signature: Option<String>,
435}
436
437impl From<Commit> for LogEntry {
438 fn from(c: Commit) -> Self {
439 LogEntry {
440 commit_id: c.commit_id,
441 parents: c.parents,
442 manifests: c.manifests,
443 author: c.author,
444 message: c.message,
445 timestamp: c.timestamp,
446 signature: c.signature,
447 }
448 }
449}
450
451pub fn log_cmd(path: &Path, json: bool) -> Result<()> {
452 let shard_dir = path.join(".shard");
453 if !shard_dir.exists() {
454 anyhow::bail!("not a shard repository (run `shard init` first)");
455 }
456
457 let store = Store::open(&shard_dir)?;
458
459 let (_, head_commit) = branch::resolve_head(&shard_dir)?;
460 let head = head_commit
461 .ok_or_else(|| anyhow::anyhow!("no commits yet (run `shard commit` after adding files)"))?;
462
463 let mut entries: Vec<LogEntry> = Vec::new();
464 let mut seen = std::collections::HashSet::new();
465 let mut stack = vec![head];
466
467 while let Some(cid) = stack.pop() {
468 if !seen.insert(cid.clone()) {
469 continue;
470 }
471 let commit = load_commit(&store, &cid)?;
472 for parent in &commit.parents {
473 stack.push(parent.clone());
474 }
475 entries.push(commit.into());
476 }
477
478 if json {
479 info!("{}", serde_json::to_string_pretty(&entries)?);
480 } else {
481 for entry in &entries {
482 let ts = {
483 let secs = entry.timestamp as i64;
484 let tm = time::OffsetDateTime::from_unix_timestamp(secs)
485 .unwrap_or(time::OffsetDateTime::UNIX_EPOCH);
486 tm.format(&time::format_description::well_known::Rfc3339)
487 .unwrap_or_else(|_| entry.timestamp.to_string())
488 };
489 info!("commit {}", entry.commit_id);
490 if !entry.parents.is_empty() {
491 info!("parents: {}", entry.parents.join(" "));
492 }
493 info!("author: {}", entry.author);
494 info!("date: {}", ts);
495 info!("");
496 for line in entry.message.lines() {
497 info!(" {}", line);
498 }
499 info!("");
500 }
501 }
502
503 Ok(())
504}
505
506pub fn checkout(path: &Path, target: &str, json: bool) -> Result<()> {
507 let shard_dir = path.join(".shard");
508 if !shard_dir.exists() {
509 anyhow::bail!("not a shard repository (run `shard init` first)");
510 }
511
512 let store = Store::open(&shard_dir)?;
513
514 let branch_path = shard_dir.join("refs").join("heads").join(target);
516 let commit_id = if branch_path.exists() {
517 let id = fs::read_to_string(&branch_path)?.trim().to_string();
518 branch::set_head_branch(&shard_dir, target)?;
519 id
520 } else {
521 branch::set_head_commit(&shard_dir, target)?;
522 target.to_string()
523 };
524
525 let commit = load_commit(&store, &commit_id)?;
526 let mut files = Vec::new();
527
528 for manifest_id in &commit.manifests {
529 let data = store.get_chunk(manifest_id)?;
530 let hash = blake3::hash(&data);
531 if hash.to_hex().to_string() != *manifest_id {
532 anyhow::bail!("Manifest hash mismatch: {}", manifest_id);
533 }
534 let manifest: FileManifest = serde_json::from_slice(&data)?;
535 let compression = manifest.compression.parse::<Compression>()?;
536 if !json {
537 info!(
538 "Checking out file: {} (compression: {})",
539 manifest.name, manifest.compression
540 );
541 }
542
543 let mut file_data = Vec::new();
544 for chunk_id in &manifest.chunks {
545 let chunk_data = store.get_chunk(chunk_id)?;
546 let decompressed = compression.decompress(&chunk_data)?;
547 file_data.extend_from_slice(&decompressed);
548 }
549 fs::write(path.join(&manifest.name), file_data)?;
550 if !json {
551 info!(" -> {}", manifest.name);
552 }
553 files.push(manifest.name);
554 }
555
556 if json {
557 info!(
558 "{}",
559 serde_json::to_string(&serde_json::json!({
560 "commit_id": commit_id,
561 "files": files,
562 }))?
563 );
564 } else {
565 info!("Checkout complete.");
566 }
567 Ok(())
568}
569
570pub fn status(path: &Path, json: bool) -> Result<()> {
571 let shard_dir = path.join(".shard");
572 if !shard_dir.exists() {
573 anyhow::bail!("not a shard repository (run `shard init` first)");
574 }
575
576 let (current_branch, head_commit) = branch::resolve_head(&shard_dir)?;
577 let mut commit_id: Option<String> = None;
578 if let Some(cid) = head_commit {
579 commit_id = Some(cid);
580 if !json {
581 if let Some(ref branch) = current_branch {
582 info!("On branch: {}", branch);
583 } else {
584 info!("HEAD detached at {}", commit_id.as_ref().unwrap());
585 }
586 }
587 } else if !json {
588 info!("No commits yet.");
589 }
590
591 let index = Index::load(&shard_dir.join("index"))?;
592 let staged: Vec<String> = index.files.keys().cloned().collect();
593 if !json {
594 if staged.is_empty() {
595 info!("Nothing staged.");
596 } else {
597 info!("\nStaged files:");
598 for name in &staged {
599 info!(" {} (to be committed)", name);
600 }
601 }
602 }
603
604 let store = Store::open(&shard_dir)?;
605 let mut deleted = Vec::new();
606 let tracked_names: std::collections::HashSet<String> = if let Some(head) = &commit_id {
607 let mut names = std::collections::HashSet::new();
608 if let Ok(commit) = load_commit(&store, head) {
609 for manifest_id in &commit.manifests {
610 if let Ok(data) = store.get_chunk(manifest_id) {
611 if let Ok(manifest) = serde_json::from_slice::<FileManifest>(&data) {
612 let file_path = path.join(&manifest.name);
613 if !file_path.exists() {
614 deleted.push(manifest.name.clone());
615 }
616 names.insert(manifest.name);
617 }
618 }
619 }
620 }
621 names
622 } else {
623 std::collections::HashSet::new()
624 };
625
626 if !json && !deleted.is_empty() {
627 info!("\nDeleted files:");
628 for name in &deleted {
629 info!(" {} (deleted)", name);
630 }
631 }
632
633 let mut untracked = Vec::new();
634 if let Ok(entries) = fs::read_dir(path) {
635 for entry in entries.flatten() {
636 if let Ok(ftype) = entry.file_type() {
637 if ftype.is_file() {
638 let name = entry.file_name().to_string_lossy().to_string();
639 if !name.starts_with('.')
640 && !index.files.contains_key(&name)
641 && !tracked_names.contains(&name)
642 {
643 untracked.push(name);
644 }
645 }
646 }
647 }
648 }
649 if !json && !untracked.is_empty() {
650 info!("\nUntracked files:");
651 for name in &untracked {
652 info!(" {}", name);
653 }
654 }
655
656 if json {
657 info!(
658 "{}",
659 serde_json::to_string(&serde_json::json!({
660 "commit": commit_id,
661 "staged": staged,
662 "deleted": deleted,
663 "untracked": untracked,
664 }))?
665 );
666 }
667
668 Ok(())
669}
670
671fn load_config(shard_dir: &Path) -> Result<std::collections::BTreeMap<String, String>> {
672 let config_path = shard_dir.join("config.json");
673 if config_path.exists() {
674 let data = fs::read(&config_path)?;
675 Ok(serde_json::from_slice(&data)?)
676 } else {
677 Ok(std::collections::BTreeMap::new())
678 }
679}
680
681fn save_config(
682 shard_dir: &Path,
683 config: &std::collections::BTreeMap<String, String>,
684) -> Result<()> {
685 let data = serde_json::to_string_pretty(config)?;
686 fs::write(shard_dir.join("config.json"), data)?;
687 Ok(())
688}
689
690pub fn config_get(path: &Path, key: Option<&str>) -> Result<()> {
691 let shard_dir = path.join(".shard");
692 if !shard_dir.exists() {
693 anyhow::bail!("not a shard repository (run `shard init` first)");
694 }
695 let config = load_config(&shard_dir)?;
696 if let Some(key) = key {
697 match config.get(key) {
698 Some(value) => info!("{} = {}", key, value),
699 None => anyhow::bail!("config key not found: {}", key),
700 }
701 } else {
702 for (k, v) in &config {
703 info!("{} = {}", k, v);
704 }
705 }
706 Ok(())
707}
708
709pub fn config_set(path: &Path, key: &str, value: &str) -> Result<()> {
710 let shard_dir = path.join(".shard");
711 if !shard_dir.exists() {
712 anyhow::bail!("not a shard repository (run `shard init` first)");
713 }
714 let mut config = load_config(&shard_dir)?;
715 config.insert(key.to_string(), value.to_string());
716 save_config(&shard_dir, &config)?;
717 info!("{} = {}", key, value);
718 Ok(())
719}
720
721fn load_tags(shard_dir: &Path) -> Result<std::collections::BTreeMap<String, String>> {
722 let tags_path = shard_dir.join("tags.json");
723 if tags_path.exists() {
724 let data = fs::read(&tags_path)?;
725 Ok(serde_json::from_slice(&data)?)
726 } else {
727 Ok(std::collections::BTreeMap::new())
728 }
729}
730
731fn save_tags(shard_dir: &Path, tags: &std::collections::BTreeMap<String, String>) -> Result<()> {
732 let data = serde_json::to_string_pretty(tags)?;
733 fs::write(shard_dir.join("tags.json"), data)?;
734 Ok(())
735}
736
737pub fn tag_add(path: &Path, name: &str, commit_id: &str) -> Result<()> {
738 let shard_dir = path.join(".shard");
739 if !shard_dir.exists() {
740 anyhow::bail!("not a shard repository (run `shard init` first)");
741 }
742 let store = Store::open(&shard_dir)?;
744 load_commit(&store, commit_id)?;
745 let mut tags = load_tags(&shard_dir)?;
746 tags.insert(name.to_string(), commit_id.to_string());
747 save_tags(&shard_dir, &tags)?;
748 info!("Tagged '{}' -> {}", name, commit_id);
749 Ok(())
750}
751
752pub fn branch_create(path: &Path, name: &str, commit_id: Option<&str>) -> Result<()> {
753 let shard_dir = path.join(".shard");
754 if !shard_dir.exists() {
755 anyhow::bail!("not a shard repository (run `shard init` first)");
756 }
757 let id = match commit_id {
758 Some(cid) => cid.to_string(),
759 None => {
760 let (_, head) = branch::resolve_head(&shard_dir)?;
761 head.ok_or_else(|| anyhow::anyhow!("No commits yet — cannot create branch"))?
762 }
763 };
764 let store = Store::open(&shard_dir)?;
766 load_commit(&store, &id)?;
767 branch::create_branch(&shard_dir, name, &id)
768}
769
770pub fn branch_delete(path: &Path, name: &str) -> Result<()> {
771 let shard_dir = path.join(".shard");
772 if !shard_dir.exists() {
773 anyhow::bail!("not a shard repository (run `shard init` first)");
774 }
775 branch::delete_branch(&shard_dir, name)
776}
777
778pub fn branch_list(path: &Path) -> Result<()> {
779 let shard_dir = path.join(".shard");
780 if !shard_dir.exists() {
781 anyhow::bail!("not a shard repository (run `shard init` first)");
782 }
783 let (current, branches) = branch::list_branches(&shard_dir)?;
784 if branches.is_empty() {
785 info!("No branches.");
786 return Ok(());
787 }
788 for (name, commit_id) in &branches {
789 let prefix = if current.as_deref() == Some(name) {
790 "* "
791 } else {
792 " "
793 };
794 info!(
795 "{}{} ({})",
796 prefix,
797 name,
798 &commit_id[..8.min(commit_id.len())]
799 );
800 }
801 Ok(())
802}
803
804pub fn merge(path: &Path, branch: &str, message: &str, author: &str) -> Result<()> {
805 let shard_dir = path.join(".shard");
806 if !shard_dir.exists() {
807 anyhow::bail!("not a shard repository (run `shard init` first)");
808 }
809
810 let store = Store::open(&shard_dir)?;
811
812 let (current_branch, current_id) = branch::resolve_head(&shard_dir)?;
814 let current_id =
815 current_id.ok_or_else(|| anyhow::anyhow!("No commits yet — nothing to merge into"))?;
816
817 let source_id = branch::resolve_rev(&shard_dir, branch)?;
819 if source_id == current_id {
820 anyhow::bail!("Already up to date — source is the same commit as HEAD");
821 }
822
823 let current_commit = load_commit(&store, ¤t_id)?;
825 let source_commit = load_commit(&store, &source_id)?;
826
827 let mut merged_manifests: std::collections::HashMap<String, (String, Vec<String>)> =
829 std::collections::HashMap::new();
830
831 for manifest_id in ¤t_commit.manifests {
832 let data = store.get_chunk(manifest_id)?;
833 let manifest: FileManifest = serde_json::from_slice(&data)?;
834 merged_manifests.insert(manifest.name.clone(), (manifest.name, manifest.chunks));
835 }
836
837 for manifest_id in &source_commit.manifests {
838 let data = store.get_chunk(manifest_id)?;
839 let manifest: FileManifest = serde_json::from_slice(&data)?;
840 merged_manifests.insert(manifest.name.clone(), (manifest.name, manifest.chunks));
841 }
842
843 let mut merged_manifest_ids = Vec::new();
845 for (name, chunks) in merged_manifests.values() {
846 let compression = Compression::None;
847 let manifest = FileManifest {
848 name: name.clone(),
849 size: 0,
850 chunks: chunks.clone(),
851 content_type: None,
852 compression: compression.as_str().to_string(),
853 };
854 let json = serde_json::to_vec(&manifest)?;
855 let hash = blake3::hash(&json);
856 store.put_chunk(&crate::chunker::Chunk {
857 hash,
858 data: json,
859 offset: 0,
860 })?;
861 merged_manifest_ids.push(hash.to_hex().to_string());
862 }
863 merged_manifest_ids.sort();
864
865 let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
867 let keys = KeyPair::load(&shard_dir.join("keys"))?;
868 let public_key_hex = hex::encode(keys.verifying_key.to_bytes());
869 let parents = vec![current_id.clone(), source_id.clone()];
870 let mut commit = Commit {
871 commit_id: String::new(),
872 parents,
873 manifests: merged_manifest_ids,
874 author: author.to_string(),
875 message: message.to_string(),
876 timestamp,
877 public_key: Some(public_key_hex),
878 signature: None,
879 };
880
881 let signing_key = keys.signing_key;
882 let json_unsigned = serde_json::to_vec(&commit)?;
883 let signature = signing_key.sign(&json_unsigned);
884 commit.signature = Some(hex::encode(signature.to_bytes()));
885
886 let json_final = serde_json::to_vec(&commit)?;
887 let hash = blake3::hash(&json_final);
888 store.put_chunk(&crate::chunker::Chunk {
889 hash,
890 data: json_final,
891 offset: 0,
892 })?;
893
894 let merge_commit_id = hash.to_hex().to_string();
895
896 if let Some(ref branch_name) = current_branch {
898 branch::update_branch_ref(&shard_dir, branch_name, &merge_commit_id)?;
899 branch::set_head_branch(&shard_dir, branch_name)?;
900 } else {
901 branch::set_head_commit(&shard_dir, &merge_commit_id)?;
902 }
903
904 info!("Merge commit {} ({})", merge_commit_id, message);
905 Ok(())
906}
907
908pub fn tag_list(path: &Path) -> Result<()> {
909 let shard_dir = path.join(".shard");
910 if !shard_dir.exists() {
911 anyhow::bail!("not a shard repository (run `shard init` first)");
912 }
913 let tags = load_tags(&shard_dir)?;
914 if tags.is_empty() {
915 info!("No tags.");
916 } else {
917 for (name, commit_id) in &tags {
918 info!("{} -> {}", name, commit_id);
919 }
920 }
921 Ok(())
922}
923
924fn collect_reachable(
925 store: &Store,
926 commit_id: &str,
927 seen_commits: &mut std::collections::HashSet<String>,
928 reachable: &mut std::collections::HashSet<String>,
929) -> Result<()> {
930 if !seen_commits.insert(commit_id.to_string()) {
931 return Ok(());
932 }
933
934 reachable.insert(commit_id.to_string());
935
936 let commit = match load_commit(store, commit_id) {
937 Ok(c) => c,
938 Err(_) => return Ok(()),
939 };
940
941 for manifest_id in &commit.manifests {
942 reachable.insert(manifest_id.clone());
943
944 if let Ok(data) = store.get_chunk(manifest_id) {
945 if let Ok(manifest) = serde_json::from_slice::<FileManifest>(&data) {
946 for chunk_id in &manifest.chunks {
947 reachable.insert(chunk_id.clone());
948 }
949 }
950 }
951 }
952
953 for parent_id in &commit.parents {
954 collect_reachable(store, parent_id, seen_commits, reachable)?;
955 }
956
957 Ok(())
958}
959
960pub fn prune(path: &Path) -> Result<()> {
961 let shard_dir = path.join(".shard");
962 if !shard_dir.exists() {
963 anyhow::bail!("not a shard repository (run `shard init` first)");
964 }
965
966 let store = Store::open(&shard_dir)?;
967 let mut reachable: std::collections::HashSet<String> = std::collections::HashSet::new();
968
969 let (_, head_commit) = branch::resolve_head(&shard_dir)?;
971 if let Some(ref head) = head_commit {
972 collect_reachable(
973 &store,
974 head,
975 &mut std::collections::HashSet::new(),
976 &mut reachable,
977 )?;
978 }
979
980 if let Ok(branches) = branch::list_branches(&shard_dir) {
982 for (_, commit_id) in branches.1 {
983 collect_reachable(
984 &store,
985 &commit_id,
986 &mut std::collections::HashSet::new(),
987 &mut reachable,
988 )?;
989 }
990 }
991
992 let tags = load_tags(&shard_dir)?;
994 for commit_id in tags.values() {
995 collect_reachable(
996 &store,
997 commit_id,
998 &mut std::collections::HashSet::new(),
999 &mut reachable,
1000 )?;
1001 }
1002
1003 let index = Index::load(&shard_dir.join("index"))?;
1005 for manifest in index.files.values() {
1006 let json = serde_json::to_vec(manifest)?;
1007 let hash = blake3::hash(&json);
1008 let hash_hex = hash.to_hex().to_string();
1009 reachable.insert(hash_hex);
1010 for chunk_hash in &manifest.chunks {
1011 reachable.insert(chunk_hash.clone());
1012 }
1013 }
1014
1015 let all_chunks = store.iter_chunks()?;
1017 let mut pruned = 0u64;
1018 let mut kept = 0u64;
1019 for (hash_hex, full_path) in &all_chunks {
1020 if !reachable.contains(hash_hex) {
1021 store.delete_chunk(hash_hex, Some(full_path))?;
1022 pruned += 1;
1023 } else {
1024 kept += 1;
1025 }
1026 }
1027
1028 info!("Pruned {} objects. {} objects remain.", pruned, kept);
1029 Ok(())
1030}
1031
1032pub fn peer_add(path: &Path, multiaddr: &str) -> Result<()> {
1033 let shard_dir = path.join(".shard");
1034 if !shard_dir.exists() {
1035 anyhow::bail!("not a shard repository (run `shard init` first)");
1036 }
1037
1038 if multiaddr.is_empty() || multiaddr.parse::<shard_net::libp2p::Multiaddr>().is_err() {
1040 anyhow::bail!("invalid multiaddr '{}' (must be a valid libp2p multiaddr, e.g. /ip4/1.2.3.4/tcp/5678/p2p/...)", multiaddr);
1041 }
1042
1043 let peers_path = shard_dir.join("peers.json");
1044 let mut peers: Vec<String> = if peers_path.exists() {
1045 let data = fs::read(&peers_path)?;
1046 serde_json::from_slice(&data)?
1047 } else {
1048 Vec::new()
1049 };
1050
1051 if !peers.contains(&multiaddr.to_string()) {
1052 peers.push(multiaddr.to_string());
1053 let data = serde_json::to_vec(&peers)?;
1054 fs::write(peers_path, data)?;
1055 info!("Added peer: {}", multiaddr);
1056 } else {
1057 info!("Peer already exists: {}", multiaddr);
1058 }
1059
1060 Ok(())
1061}
1062
1063fn load_peers(shard_dir: &Path) -> Result<Vec<String>> {
1064 let peers_path = shard_dir.join("peers.json");
1065 if peers_path.exists() {
1066 let data = fs::read(peers_path)?;
1067 Ok(serde_json::from_slice(&data)?)
1068 } else {
1069 Ok(Vec::new())
1070 }
1071}
1072
1073fn authorized_keys_path(shard_dir: &Path) -> std::path::PathBuf {
1074 shard_dir.join("authorized_keys")
1075}
1076
1077fn load_authorized_keys(shard_dir: &Path) -> Result<Vec<ed25519_dalek::VerifyingKey>> {
1078 let path = authorized_keys_path(shard_dir);
1079 if !path.exists() {
1080 return Ok(Vec::new());
1081 }
1082 let content = fs::read_to_string(&path)?;
1083 let mut keys = Vec::new();
1084 for line in content.lines() {
1085 let line = line.trim();
1086 if line.is_empty() || line.starts_with('#') {
1087 continue;
1088 }
1089 let bytes = hex::decode(line)?;
1090 let arr: [u8; 32] = bytes
1091 .as_slice()
1092 .try_into()
1093 .map_err(|_| anyhow::anyhow!("Invalid public key length in authorized_keys"))?;
1094 keys.push(ed25519_dalek::VerifyingKey::from_bytes(&arr)?);
1095 }
1096 Ok(keys)
1097}
1098
1099pub fn add_authorized_key(shard_dir: &Path, public_key_hex: &str) -> Result<()> {
1100 let bytes = hex::decode(public_key_hex)?;
1102 let arr: [u8; 32] = bytes
1103 .as_slice()
1104 .try_into()
1105 .map_err(|_| anyhow::anyhow!("Public key must be 32 bytes (64 hex chars)"))?;
1106 let _pk = ed25519_dalek::VerifyingKey::from_bytes(&arr)?;
1107
1108 let path = authorized_keys_path(shard_dir);
1109 let mut content = if path.exists() {
1110 fs::read_to_string(&path)?
1111 } else {
1112 String::new()
1113 };
1114 if content.lines().any(|l| l.trim() == public_key_hex) {
1116 info!("Key already authorized");
1117 return Ok(());
1118 }
1119 content.push_str(public_key_hex);
1120 content.push('\n');
1121 fs::write(&path, content)?;
1122 info!("Authorized key added");
1123 Ok(())
1124}
1125
1126pub fn backup(path: &Path, output: &Path) -> Result<()> {
1127 let shard_dir = path.join(".shard");
1128 if !shard_dir.exists() {
1129 anyhow::bail!("not a shard repository (run `shard init` first)");
1130 }
1131 let file = fs::File::create(output)?;
1132 let encoder = flate2::write::GzEncoder::new(file, flate2::Compression::default());
1133 let mut archive = tar::Builder::new(encoder);
1134 archive.append_dir_all(".", &shard_dir)?;
1135 archive.finish()?;
1136 info!("Backup created: {}", output.display());
1137 Ok(())
1138}
1139
1140pub fn export(path: &Path, commit_id: &str, output_dir: &Path, json: bool) -> Result<()> {
1141 let shard_dir = path.join(".shard");
1142 if !shard_dir.exists() {
1143 anyhow::bail!("not a shard repository (run `shard init` first)");
1144 }
1145 let store = Store::open(&shard_dir)?;
1146 let commit = load_commit(&store, commit_id)?;
1147 let mut files = Vec::new();
1148 for manifest_id in &commit.manifests {
1149 let data = store.get_chunk(manifest_id)?;
1150 let manifest: FileManifest = serde_json::from_slice(&data)?;
1151 let compression = manifest.compression.parse::<Compression>()?;
1152 if !json {
1153 info!("Exporting file: {}", manifest.name);
1154 }
1155 let mut file_data = Vec::new();
1156 for chunk_id in &manifest.chunks {
1157 let chunk_data = store.get_chunk(chunk_id)?;
1158 let decompressed = compression.decompress(&chunk_data)?;
1159 file_data.extend_from_slice(&decompressed);
1160 }
1161 let out_path = output_dir.join(&manifest.name);
1162 if let Some(parent) = out_path.parent() {
1163 fs::create_dir_all(parent)?;
1164 }
1165 fs::write(&out_path, file_data)?;
1166 if !json {
1167 info!(" -> {}", out_path.display());
1168 }
1169 files.push(manifest.name);
1170 }
1171 if json {
1172 info!(
1173 "{}",
1174 serde_json::to_string(&serde_json::json!({
1175 "commit_id": commit_id,
1176 "files": files,
1177 "output_dir": output_dir.to_string_lossy(),
1178 }))?
1179 );
1180 } else {
1181 info!("Export complete.");
1182 }
1183 Ok(())
1184}
1185
1186pub fn import(path: &Path, source_dir: &Path, message: &str, author: &str) -> Result<()> {
1187 let shard_dir = path.join(".shard");
1188 if !shard_dir.exists() {
1189 anyhow::bail!("not a shard repository (run `shard init` first)");
1190 }
1191 let config = load_config(&shard_dir)?;
1193 let compression: Compression = config
1194 .get("compression")
1195 .map(|s| s.as_str())
1196 .unwrap_or("zstd")
1197 .parse()?;
1198 let chunker_mode = chunker::ChunkerMode::from_config(&config);
1199 let store = Store::open(&shard_dir)?;
1200 let mut index = Index::load(&shard_dir.join("index"))?;
1201 if !source_dir.is_dir() {
1202 anyhow::bail!("Source must be a directory");
1203 }
1204 for entry in walkdir::WalkDir::new(source_dir)
1205 .into_iter()
1206 .filter_entry(|e| {
1207 e.file_name()
1208 .to_str()
1209 .map(|s| !s.starts_with('.'))
1210 .unwrap_or(false)
1211 })
1212 {
1213 let entry = entry?;
1214 if entry.file_type().is_file() {
1215 add_file(
1216 path,
1217 entry.path(),
1218 &store,
1219 &mut index,
1220 &compression,
1221 &chunker_mode,
1222 )?;
1223 }
1224 }
1225 index.save(&shard_dir.join("index"))?;
1226 if !index.files.is_empty() {
1228 commit(path, message, author)?;
1229 } else {
1230 info!("No files found to import.");
1231 }
1232 Ok(())
1233}
1234
1235pub fn restore(path: &Path, backup_file: &Path) -> Result<()> {
1236 let shard_dir = path.join(".shard");
1237 if shard_dir.exists() {
1238 anyhow::bail!(
1239 "Repository already exists — remove .shard first or use a different directory"
1240 );
1241 }
1242 let file = fs::File::open(backup_file)?;
1243 let decoder = flate2::read::GzDecoder::new(file);
1244 let mut archive = tar::Archive::new(decoder);
1245 archive.unpack(path)?;
1246 if !path.join(".shard").exists() {
1248 anyhow::bail!("Backup does not contain a valid .shard directory");
1249 }
1250 info!("Restored from {}", backup_file.display());
1251 Ok(())
1252}
1253
1254struct RepoProvider {
1255 store: Store,
1256 shard_dir: std::path::PathBuf,
1257}
1258
1259impl shard_net::p2p::ShardContentProvider for RepoProvider {
1260 fn get_manifest(&self, id: &str) -> Option<Vec<u8>> {
1261 self.store.get_chunk(id).ok()
1262 }
1263 fn get_chunk(&self, id: &str) -> Option<Vec<u8>> {
1264 self.store.get_chunk(id).ok()
1265 }
1266 fn put_chunk(&mut self, id: &str, data: &[u8]) -> bool {
1267 let hash = blake3::hash(data);
1268 let hex = hash.to_hex().to_string();
1269 if hex != id {
1270 return false;
1271 }
1272 self.store
1273 .put_chunk(&crate::chunker::Chunk {
1274 hash,
1275 data: data.to_vec(),
1276 offset: 0,
1277 })
1278 .is_ok()
1279 }
1280 fn verify_auth(&self, public_key: &[u8], nonce: &[u8], signature: &[u8]) -> bool {
1281 use ed25519_dalek::Verifier;
1282 let pk_bytes: [u8; 32] = match public_key.try_into() {
1283 Ok(b) => b,
1284 Err(_) => return false,
1285 };
1286 let pk = match ed25519_dalek::VerifyingKey::from_bytes(&pk_bytes) {
1287 Ok(k) => k,
1288 Err(_) => return false,
1289 };
1290 let sig_bytes: [u8; 64] = match signature.try_into() {
1291 Ok(b) => b,
1292 Err(_) => return false,
1293 };
1294 let sig = ed25519_dalek::Signature::from_bytes(&sig_bytes);
1295 if pk.verify(nonce, &sig).is_err() {
1296 return false;
1297 }
1298 if let Ok(keys) = load_authorized_keys(&self.shard_dir) {
1300 if !keys.is_empty() {
1301 return keys.contains(&pk);
1302 }
1303 }
1304 true
1305 }
1306 fn repo_public_key(&self) -> Option<Vec<u8>> {
1307 let keys = shard_crypto::KeyPair::load(&self.shard_dir.join("keys")).ok()?;
1308 Some(keys.verifying_key.to_bytes().to_vec())
1309 }
1310}
1311
1312pub async fn share(path: &Path) -> Result<()> {
1313 let shard_dir = path.join(".shard");
1314 if !shard_dir.exists() {
1315 anyhow::bail!("not a shard repository (run `shard init` first)");
1316 }
1317
1318 let mut node = shard_net::p2p::Node::new().await?;
1319
1320 let peers = load_peers(&shard_dir)?;
1322 for peer in peers {
1323 if let Ok(addr) = peer.parse::<shard_net::libp2p::Multiaddr>() {
1324 let _ = node.swarm.dial(addr);
1325 }
1326 }
1327
1328 node.listen("/ip4/0.0.0.0/tcp/0").await?; info!("Sharing repository...");
1333 let store = Store::open(&shard_dir)?;
1334 let provider = RepoProvider {
1335 store,
1336 shard_dir: shard_dir.clone(),
1337 };
1338 node.run(provider).await;
1339
1340 Ok(())
1341}
1342
1343pub async fn sync(path: &Path) -> Result<()> {
1344 let shard_dir = path.join(".shard");
1345 if !shard_dir.exists() {
1346 anyhow::bail!("not a shard repository (run `shard init` first)");
1347 }
1348
1349 let config = load_config(&shard_dir)?;
1350 let repo_id = config
1351 .get("repo_id")
1352 .ok_or_else(|| anyhow::anyhow!("No repo_id in config. Run `shard init` to create one."))?;
1353 let topic_str = format!("/shard/repo/{}", repo_id);
1354 let topic = shard_net::libp2p::gossipsub::IdentTopic::new(topic_str);
1355
1356 let mut node = shard_net::p2p::Node::new().await?;
1357 node.subscribe(&topic)?;
1358 node.listen("/ip4/0.0.0.0/tcp/0").await?;
1359
1360 let peers = load_peers(&shard_dir)?;
1362 for peer in peers {
1363 if let Ok(addr) = peer.parse::<shard_net::libp2p::Multiaddr>() {
1364 let _ = node.swarm.dial(addr);
1365 }
1366 }
1367
1368 let head_commit = branch::resolve_head(&shard_dir)?.1;
1369
1370 if let Some(ref head) = head_commit {
1372 let msg = format!("announce:{}", head);
1373 match node.publish(&topic, msg.as_bytes()) {
1374 Ok(_) => info!("Announced commit {} on sync topic", head),
1375 Err(e) => error!("Initial announce (will retry): {}", e),
1376 }
1377 } else {
1378 info!("No commits to announce");
1379 }
1380
1381 info!("Syncing on topic with peer id: {}", node.local_peer_id());
1382 let _ = std::io::stdout().flush();
1383
1384 let store = Store::open(&shard_dir)?;
1385 let mut provider = RepoProvider {
1386 store,
1387 shard_dir: shard_dir.clone(),
1388 };
1389
1390 let mut interval = tokio::time::interval(Duration::from_secs(5));
1391 let mut address_book: HashMap<shard_net::libp2p::PeerId, Vec<shard_net::libp2p::Multiaddr>> =
1392 HashMap::new();
1393 let path_buf = path.to_path_buf();
1394
1395 loop {
1396 tokio::select! {
1397 _ = tokio::signal::ctrl_c() => {
1398 info!("\nSync shutting down...");
1399 break Ok(());
1400 }
1401 _ = interval.tick() => {
1402 if let Some(ref head) = branch::resolve_head(&shard_dir)?.1 {
1403 let msg = format!("announce:{}", head);
1404 match node.publish(&topic, msg.as_bytes()) {
1405 Ok(_) => info!("Re-announced commit {} on sync topic", head),
1406 Err(e) => error!("Re-announce failed: {}", e),
1407 }
1408 }
1409 }
1410 event = node.swarm.select_next_some() => {
1411 match event {
1412 shard_net::libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
1413 info!("Listening on {address:?}");
1414 let _ = std::io::stdout().flush();
1415 }
1416 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
1417 shard_net::p2p::ShardBehaviourEvent::Mdns(
1418 shard_net::libp2p::mdns::Event::Discovered(list),
1419 ),
1420 ) => {
1421 for (peer_id, multiaddr) in list {
1422 info!("mDNS discovered: {peer_id} {multiaddr}");
1423 address_book.entry(peer_id).or_default().push(multiaddr.clone());
1424 node.swarm
1425 .behaviour_mut()
1426 .gossipsub
1427 .add_explicit_peer(&peer_id);
1428 node.swarm
1429 .behaviour_mut()
1430 .kademlia
1431 .add_address(&peer_id, multiaddr);
1432 }
1433 }
1434 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
1435 shard_net::p2p::ShardBehaviourEvent::Mdns(shard_net::libp2p::mdns::Event::Expired(
1436 list,
1437 )),
1438 ) => {
1439 for (peer_id, _multiaddr) in list {
1440 info!("mDNS expired: {peer_id}");
1441 node.swarm
1442 .behaviour_mut()
1443 .gossipsub
1444 .remove_explicit_peer(&peer_id);
1445 }
1446 }
1447 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
1448 shard_net::p2p::ShardBehaviourEvent::Gossipsub(
1449 shard_net::libp2p::gossipsub::Event::Message {
1450 propagation_source,
1451 message,
1452 ..
1453 },
1454 ),
1455 ) => {
1456 if let Ok(text) = String::from_utf8(message.data.clone()) {
1457 if let Some(commit_id) = text.strip_prefix("announce:") {
1458 info!(
1459 "Peer {} announced commit: {}",
1460 propagation_source, commit_id
1461 );
1462 let peer = propagation_source;
1463 let commit_id_owned = commit_id.to_string();
1464 let our_head = branch::resolve_head(&shard_dir)?.1.unwrap_or_default();
1466 if our_head != commit_id_owned {
1467 let msg = format!("announce:{}", our_head);
1468 let _ = node.publish(&topic, msg.as_bytes());
1469 }
1470 if let Some(addrs) = address_book.get(&peer) {
1471 if let Some(addr) = addrs.first() {
1472 let multiaddr_str = format!("{}/p2p/{}", addr, peer);
1473 let path_clone = path_buf.clone();
1474 tokio::spawn(async move {
1475 match pull(&path_clone, &multiaddr_str, &commit_id_owned).await {
1476 Ok(_) => info!("Auto-pulled commit {} from {}", commit_id_owned, peer),
1477 Err(e) => error!("Auto-pull failed for commit {} from {}: {}", commit_id_owned, peer, e),
1478 }
1479 });
1480 }
1481 }
1482 }
1483 }
1484 }
1485 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
1486 shard_net::p2p::ShardBehaviourEvent::RequestResponse(
1487 shard_net::libp2p::request_response::Event::Message { peer, message },
1488 ),
1489 ) => {
1490 if let shard_net::libp2p::request_response::Message::Request {
1491 request, channel, ..
1492 } = message
1493 {
1494 info!("Received request from {}", peer);
1495 node.serve_request(&peer, &mut provider, request, channel);
1496 } else {
1497 info!("Received Response from {}", peer);
1498 }
1499 }
1500 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
1501 shard_net::p2p::ShardBehaviourEvent::RequestResponse(
1502 shard_net::libp2p::request_response::Event::OutboundFailure {
1503 peer, error, ..
1504 },
1505 ),
1506 ) => {
1507 error!("Outbound failure to {}: {:?}", peer, error);
1508 }
1509 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
1510 shard_net::p2p::ShardBehaviourEvent::RequestResponse(
1511 shard_net::libp2p::request_response::Event::InboundFailure {
1512 peer, error, ..
1513 },
1514 ),
1515 ) => {
1516 error!("Inbound failure from {}: {:?}", peer, error);
1517 }
1518 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
1519 shard_net::p2p::ShardBehaviourEvent::Identify(
1520 shard_net::libp2p::identify::Event::Received { peer_id, info },
1521 ),
1522 ) => {
1523 info!("Identify received from {}: {:?}", peer_id, info.listen_addrs);
1524 for addr in info.listen_addrs {
1525 address_book.entry(peer_id).or_default().push(addr);
1526 }
1527 let _ = std::io::stdout().flush();
1528 }
1529 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
1530 shard_net::p2p::ShardBehaviourEvent::Identify(event),
1531 ) => {
1532 info!("Identify event: {:?}", event);
1533 }
1534 shard_net::libp2p::swarm::SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } => {
1535 info!("Connection established with {}", peer_id);
1536 if let shard_net::libp2p::core::ConnectedPoint::Dialer { address, .. } = &endpoint {
1539 address_book.entry(peer_id).or_default().push(address.clone());
1540 }
1541 node.swarm
1542 .behaviour_mut()
1543 .gossipsub
1544 .add_explicit_peer(&peer_id);
1545 if let Some(ref head) = branch::resolve_head(&shard_dir)?.1 {
1547 let msg = format!("announce:{}", head);
1548 let _ = node.publish(&topic, msg.as_bytes());
1549 }
1550 }
1551 shard_net::libp2p::swarm::SwarmEvent::IncomingConnection {
1552 local_addr,
1553 send_back_addr,
1554 ..
1555 } => {
1556 info!(
1557 "Incoming connection from {} to {}",
1558 send_back_addr, local_addr
1559 );
1560 }
1561 e => {
1562 info!("Event: {:?}", e);
1563 }
1564 }
1565 }
1566 }
1567 }
1568}
1569
1570pub async fn pull(path: &Path, peer: &str, commit_id: &str) -> Result<()> {
1571 let shard_dir = path.join(".shard");
1572 if !shard_dir.exists() {
1576 init(path, "flat", "zstd", "fixed", None)?;
1577 }
1578
1579 let store = Store::open(&shard_dir)?;
1580
1581 let mut node = shard_net::p2p::Node::new().await?;
1582
1583 let multiaddr: shard_net::libp2p::Multiaddr = peer.parse()?;
1585 let peer_id = match multiaddr.iter().last() {
1586 Some(shard_net::libp2p::multiaddr::Protocol::P2p(peer_id)) => peer_id,
1587 _ => anyhow::bail!("Multiaddr must end with /p2p/<peer_id>"),
1588 };
1589
1590 info!("Pulling commit {} from {}...", commit_id, peer);
1592 let commit_data = node
1593 .request_manifest(&multiaddr, peer_id, commit_id.to_string())
1594 .await?;
1595 let hash = blake3::hash(&commit_data);
1596 if hash.to_hex().to_string() != commit_id {
1597 anyhow::bail!("Commit hash mismatch");
1598 }
1599 let chunk = crate::chunker::Chunk {
1600 hash,
1601 data: commit_data.clone(),
1602 offset: 0,
1603 };
1604 store.put_chunk(&chunk)?;
1605
1606 let commit: Commit = serde_json::from_slice(&commit_data)?;
1607 info!("Got commit: {}", commit.message);
1608
1609 if let Some(pk_hex) = &commit.public_key {
1611 let pk_bytes = hex::decode(pk_hex)?;
1612 let repo_id = blake3::hash(&pk_bytes).to_hex().to_string();
1613 let mut config = load_config(&shard_dir)?;
1614 config.insert("repo_id".to_string(), repo_id);
1615 save_config(&shard_dir, &config)?;
1616 }
1617
1618 let manifest_requests: Vec<(String, shard_net::protocol::ShardRequest)> = commit
1620 .manifests
1621 .iter()
1622 .map(|id| {
1623 (
1624 id.clone(),
1625 shard_net::protocol::ShardRequest::GetManifest(id.clone()),
1626 )
1627 })
1628 .collect();
1629 let manifest_results = node
1630 .request_parallel(&multiaddr, peer_id, manifest_requests)
1631 .await?;
1632
1633 let mut all_chunk_ids: Vec<String> = Vec::new();
1634 let mut file_manifests: Vec<FileManifest> = Vec::new();
1635 let mut chunk_compression: HashMap<String, String> = HashMap::new();
1637
1638 for (manifest_id, manifest_data) in &manifest_results {
1639 let hash = blake3::hash(manifest_data);
1640 if hash.to_hex().to_string() != *manifest_id {
1641 anyhow::bail!("Manifest hash mismatch: {}", manifest_id);
1642 }
1643 let chunk = crate::chunker::Chunk {
1644 hash,
1645 data: manifest_data.clone(),
1646 offset: 0,
1647 };
1648 store.put_chunk(&chunk)?;
1649 let manifest: FileManifest = serde_json::from_slice(manifest_data)?;
1650 info!(
1651 "Fetching file: {} (compression: {})",
1652 manifest.name, manifest.compression
1653 );
1654 for cid in &manifest.chunks {
1655 chunk_compression.insert(cid.clone(), manifest.compression.clone());
1656 }
1657 all_chunk_ids.extend(manifest.chunks.clone());
1658 file_manifests.push(manifest);
1659 }
1660
1661 let needed_chunks: Vec<String> = all_chunk_ids
1663 .into_iter()
1664 .filter(|id| store.get_chunk(id).is_err())
1665 .collect();
1666
1667 if !needed_chunks.is_empty() {
1668 info!("Fetching {} chunks...", needed_chunks.len());
1669 let chunk_requests: Vec<(String, shard_net::protocol::ShardRequest)> = needed_chunks
1670 .iter()
1671 .map(|id| {
1672 (
1673 id.clone(),
1674 shard_net::protocol::ShardRequest::GetChunk(id.clone()),
1675 )
1676 })
1677 .collect();
1678 let chunk_results = node
1679 .request_parallel(&multiaddr, peer_id, chunk_requests)
1680 .await?;
1681 for (chunk_id, chunk_data) in &chunk_results {
1682 let compression: Compression = chunk_compression
1684 .get(chunk_id)
1685 .map(|s| s.as_str())
1686 .unwrap_or("none")
1687 .parse()?;
1688 let decompressed = compression.decompress(chunk_data)?;
1690 let hash = blake3::hash(&decompressed);
1691 if hash.to_hex().to_string() != *chunk_id {
1692 anyhow::bail!("Chunk hash mismatch: {}", chunk_id);
1693 }
1694 let chunk = crate::chunker::Chunk {
1696 hash,
1697 data: chunk_data.clone(),
1698 offset: 0,
1699 };
1700 store.put_chunk(&chunk)?;
1701 }
1702 }
1703
1704 for manifest in &file_manifests {
1706 let compression = manifest.compression.parse::<Compression>()?;
1707 let mut file_data = Vec::new();
1708 for chunk_id in &manifest.chunks {
1709 let compressed = store.get_chunk(chunk_id)?;
1710 let decompressed = compression.decompress(&compressed)?;
1711 file_data.extend_from_slice(&decompressed);
1712 }
1713 fs::write(path.join(&manifest.name), file_data)?;
1714 info!(
1715 "Reconstructed file: {} ({} bytes)",
1716 manifest.name, manifest.size
1717 );
1718 }
1719
1720 info!("Pull complete.");
1721 Ok(())
1722}
1723
1724pub async fn push(path: &Path, peer: &str) -> Result<()> {
1725 let shard_dir = path.join(".shard");
1726 if !shard_dir.exists() {
1727 anyhow::bail!("not a shard repository (run `shard init` first)");
1728 }
1729
1730 let (_, head_id) = branch::resolve_head(&shard_dir)?;
1731 let head_id = head_id.ok_or_else(|| anyhow::anyhow!("No commits to push"))?;
1732
1733 let store = Store::open(&shard_dir)?;
1734
1735 let mut objects: std::collections::BTreeMap<String, Vec<u8>> =
1737 std::collections::BTreeMap::new();
1738
1739 let mut seen = std::collections::HashSet::new();
1741 let mut stack = vec![head_id.clone()];
1742 while let Some(cid) = stack.pop() {
1743 if !seen.insert(cid.clone()) {
1744 continue;
1745 }
1746 if let Ok(data) = store.get_chunk(&cid) {
1747 objects.insert(cid, data.clone());
1748 if let Ok(commit) = serde_json::from_slice::<Commit>(&data) {
1749 for mid in &commit.manifests {
1750 if let Ok(manifest_data) = store.get_chunk(mid) {
1751 objects.insert(mid.clone(), manifest_data.clone());
1752 if let Ok(manifest) = serde_json::from_slice::<FileManifest>(&manifest_data)
1753 {
1754 for cid in &manifest.chunks {
1755 if let Ok(chunk_data) = store.get_chunk(cid) {
1756 objects.insert(cid.clone(), chunk_data);
1757 }
1758 }
1759 }
1760 }
1761 }
1762 for parent in &commit.parents {
1763 stack.push(parent.clone());
1764 }
1765 }
1766 }
1767 }
1768
1769 info!(
1770 "Pushing {} objects ({} bytes)...",
1771 objects.len(),
1772 objects.values().map(|v| v.len() as u64).sum::<u64>()
1773 );
1774
1775 let mut node = shard_net::p2p::Node::new().await?;
1777 let multiaddr: shard_net::libp2p::Multiaddr = peer.parse()?;
1778 let peer_id = match multiaddr.iter().last() {
1779 Some(shard_net::libp2p::multiaddr::Protocol::P2p(peer_id)) => peer_id,
1780 _ => anyhow::bail!("Multiaddr must end with /p2p/<peer_id>"),
1781 };
1782
1783 for (id, data) in &objects {
1784 node.request_put_chunk(&multiaddr, peer_id, id.clone(), data.clone())
1785 .await?;
1786 }
1787
1788 info!("Push complete ({} objects).", objects.len());
1789 Ok(())
1790}