1pub mod branch;
2pub mod chunker;
3pub mod commit;
4pub mod compression;
5pub mod encryption;
6pub mod index;
7pub mod keychain;
8pub mod manifest;
9pub mod metadata;
10pub mod partial;
11pub mod store;
12pub mod wal;
13
14use crate::commit::Commit;
15use crate::compression::Compression;
16use crate::index::Index;
17use crate::keychain::KeyRotation;
18use crate::manifest::FileManifest;
19use crate::store::Store;
20use anyhow::Result;
21use ed25519_dalek::{Signer, Verifier};
22use metadata::MetadataFormat;
23use serde::Serialize;
24use shard_crypto::KeyPair;
25use shard_net::libp2p::futures::StreamExt;
26use similar::TextDiff;
27use std::collections::HashMap;
28use std::fs;
29use std::io::Write;
30use std::path::Path;
31use std::path::PathBuf;
32use std::time::{Duration, SystemTime, UNIX_EPOCH};
33use tracing::{error, info, warn};
34
35fn load_shardignore(path: &Path) -> Vec<String> {
36 let ignore_path = path.join(".shardignore");
37 if ignore_path.exists() {
38 fs::read_to_string(&ignore_path)
39 .unwrap_or_default()
40 .lines()
41 .filter(|l| !l.trim().is_empty() && !l.trim().starts_with('#'))
42 .map(|l| l.trim().to_string())
43 .collect()
44 } else {
45 Vec::new()
46 }
47}
48
49fn matches_ignore(path: &Path, patterns: &[String]) -> bool {
50 let path_str = path.to_string_lossy();
51 for pattern in patterns {
52 let pattern = pattern.trim_end_matches('/');
53 if pattern == "*" {
54 return true;
55 }
56 if let Some(glob) = pattern.strip_prefix("**/") {
57 if path_str.contains(glob) {
58 return true;
59 }
60 } else if path_str == pattern || path_str.ends_with(&format!("/{}", pattern)) {
61 return true;
62 }
63 }
64 false
65}
66
67pub fn init(
68 path: &Path,
69 backend: &str,
70 compression_algo: &str,
71 chunker_mode: &str,
72 chunk_size: Option<u64>,
73 is_private: bool,
74 json: bool,
75) -> Result<()> {
76 let shard_dir = path.join(".shard");
77 if shard_dir.exists() {
78 anyhow::bail!(
79 "repository already initialized at {} (run `shard status` to confirm)",
80 shard_dir.display()
81 );
82 }
83 fs::create_dir_all(shard_dir.join("objects"))?;
84 fs::create_dir_all(shard_dir.join("keys"))?;
85 fs::create_dir_all(shard_dir.join("refs").join("heads"))?;
86 branch::set_head_branch(&shard_dir, "main")?;
87
88 let keys = KeyPair::generate();
89 keys.save(&shard_dir.join("keys"))?;
90 if let Some(global) = global_keys_dir() {
91 fs::create_dir_all(&global).ok();
92 let _ = keys.save(&global);
93 let _ = keychain::init_keychain(&global);
94 }
95 keychain::init_keychain(&shard_dir.join("keys"))?;
96
97 let pubkey = fs::read(shard_dir.join("keys/public.key"))?;
100 let repo_id = blake3::hash(&pubkey).to_hex().to_string();
101 let mut config = load_config(&shard_dir)?;
102
103 if is_private {
104 let key = encryption::generate_repo_key();
105 encryption::save_repo_key(&shard_dir.join("keys"), &key)?;
106 config.insert("private".to_string(), "true".to_string());
107 }
108 config.insert("repo_id".to_string(), repo_id);
109 config.insert("storage_backend".to_string(), backend.to_string());
110 config.insert(
111 "serialization_format".to_string(),
112 MetadataFormat::Json.config_value().to_string(),
113 );
114 config.insert("compression".to_string(), compression_algo.to_string());
115 config.insert("chunker_mode".to_string(), chunker_mode.to_string());
116 match chunker_mode {
117 "rabin" => {
118 let chunk_size = chunk_size.unwrap_or(4_194_304);
119 let min = chunk_size / 4;
120 let max = chunk_size * 2;
121 config.insert("chunk_min".to_string(), min.to_string());
122 config.insert("chunk_avg".to_string(), chunk_size.to_string());
123 config.insert("chunk_max".to_string(), max.to_string());
124 }
125 _ => {
126 let cs = chunk_size.unwrap_or(4_194_304);
127 config.insert("chunk_size".to_string(), cs.to_string());
128 }
129 }
130 save_config(&shard_dir, &config)?;
131
132 let chunker_desc = if chunker_mode == "rabin" {
133 format!(
134 "rabin (avg {} bytes)",
135 config.get("chunk_avg").unwrap_or(&"4 MiB".to_string())
136 )
137 } else {
138 format!(
139 "fixed ({} bytes)",
140 config.get("chunk_size").unwrap_or(&"4 MiB".to_string())
141 )
142 };
143 if json {
144 info!(
145 "{}",
146 serde_json::to_string(&serde_json::json!({
147 "path": shard_dir.display().to_string(),
148 "backend": backend,
149 "compression": compression_algo,
150 "chunker": chunker_desc,
151 "private": is_private,
152 }))?
153 );
154 } else {
155 info!(
156 "Initialized empty Shard repository in {} with {} storage (compression: {}, chunking: {})",
157 shard_dir.display(),
158 backend,
159 compression_algo,
160 chunker_desc,
161 );
162 }
163 Ok(())
164}
165
166fn relative_path(repo_root: &Path, file_path: &Path) -> String {
167 let repo = repo_root
168 .canonicalize()
169 .unwrap_or_else(|_| repo_root.to_path_buf());
170 let file = file_path
171 .canonicalize()
172 .unwrap_or_else(|_| file_path.to_path_buf());
173 file.strip_prefix(&repo)
174 .map(|p| p.to_string_lossy().to_string())
175 .unwrap_or_else(|_| {
176 file_path
177 .file_name()
178 .map(|s| s.to_string_lossy().to_string())
179 .unwrap_or_default()
180 })
181}
182
183fn detect_content_type(file_path: &Path) -> Option<String> {
184 let ext = file_path.extension()?.to_str()?.to_lowercase();
185 let mime = match ext.as_str() {
186 "txt" => "text/plain",
187 "json" => "application/json",
188 "csv" => "text/csv",
189 "png" => "image/png",
190 "jpg" | "jpeg" => "image/jpeg",
191 "gif" => "image/gif",
192 "pdf" => "application/pdf",
193 "yaml" | "yml" => "application/x-yaml",
194 "md" => "text/markdown",
195 "html" | "htm" => "text/html",
196 "py" => "text/x-python",
197 "rs" => "text/x-rust",
198 "ts" => "text/x-typescript",
199 "js" => "application/javascript",
200 "wasm" => "application/wasm",
201 "toml" => "application/toml",
202 "xml" => "application/xml",
203 "zip" => "application/zip",
204 "tar" => "application/x-tar",
205 "gz" => "application/gzip",
206 "bin" => "application/octet-stream",
207 "pt" | "pth" | "ckpt" | "safetensors" => "application/x-model",
208 _ => return None,
209 };
210 Some(mime.to_string())
211}
212
213#[allow(clippy::too_many_arguments)]
214fn add_file(
215 repo_root: &Path,
216 file_path: &Path,
217 store: &Store,
218 index: &mut Index,
219 compression: &Compression,
220 chunker_mode: &chunker::ChunkerMode,
221 cipher: Option<&encryption::RepoCipher>,
222 _json: bool,
223) -> Result<()> {
224 let file = fs::File::open(file_path)?;
225 let mut chunker = match chunker_mode {
226 chunker::ChunkerMode::Fixed { chunk_size } => {
227 chunker::Chunker::new_fixed(Box::new(file), *chunk_size)
228 }
229 chunker::ChunkerMode::Rabin { min, avg, max } => {
230 chunker::Chunker::new_rabin(Box::new(file), *min, *avg, *max)
231 }
232 };
233 let mut chunk_hashes = Vec::new();
234 let mut total_size = 0;
235
236 while let Some(chunk) = chunker.next_chunk()? {
237 let hash = chunk.hash;
238 let compressed_data = compression.compress(&chunk.data)?;
239 let stored_data = match cipher {
240 Some(c) => c.encrypt(&compressed_data),
241 None => compressed_data,
242 };
243 let stored = crate::chunker::Chunk {
244 hash,
245 data: stored_data,
246 offset: chunk.offset,
247 };
248 store.put_chunk(&stored)?;
249 chunk_hashes.push(hash.to_hex().to_string());
250 total_size += chunk.data.len() as u64;
251 }
252
253 let name = relative_path(repo_root, file_path);
254 let manifest = FileManifest {
255 name: name.clone(),
256 size: total_size,
257 chunks: chunk_hashes.clone(),
258 content_type: detect_content_type(file_path),
259 compression: compression.as_str().to_string(),
260 merkle_root: Some(FileManifest::merkle_root(&chunk_hashes)),
261 created_by: None,
262 created_at: None,
263 signature: None,
264 };
265
266 index.files.insert(name.clone(), manifest);
267 if !_json {
268 info!("Added {} ({})", name, total_size);
269 }
270 Ok(())
271}
272
273pub fn recover(path: &Path, json: bool) -> Result<()> {
274 let shard_dir = path.join(".shard");
275 if !shard_dir.exists() {
276 anyhow::bail!("not a shard repository (run `shard init` first)");
277 }
278 wal::recover(&shard_dir)?;
279 if json {
280 info!(
281 "{}",
282 serde_json::to_string(&serde_json::json!({"status": "recovery complete"}))?
283 );
284 } else {
285 info!("Recovery complete.");
286 }
287 Ok(())
288}
289
290pub fn add(path: &Path, file_path: &Path, json: bool) -> Result<()> {
291 let shard_dir = path.join(".shard");
292 if !shard_dir.exists() {
293 anyhow::bail!("not a shard repository (run `shard init` first)");
294 }
295
296 wal::recover(&shard_dir)?;
297
298 let config = load_config(&shard_dir)?;
299 let compression: Compression = config
300 .get("compression")
301 .map(|s| s.as_str())
302 .unwrap_or("zstd")
303 .parse()?;
304
305 let chunker_mode = chunker::ChunkerMode::from_config(&config);
306 let fmt = MetadataFormat::from_config(&config);
307
308 let store = Store::open(&shard_dir)?;
309 let mut index = Index::load(&shard_dir.join("index"), &fmt)?;
310
311 let cipher = maybe_load_cipher(&shard_dir)?;
312 let ignore_patterns = load_shardignore(path);
313
314 if file_path.is_dir() {
315 for entry in walkdir::WalkDir::new(file_path)
316 .into_iter()
317 .filter_entry(|e| {
318 let name = e.file_name().to_string_lossy();
319 if name == ".shard" || name == ".git" {
320 return false;
321 }
322 let rel = e.path().strip_prefix(file_path).unwrap_or(e.path());
323 if matches_ignore(rel, &ignore_patterns) {
324 return false;
325 }
326 true
327 })
328 {
329 let entry = entry?;
330 if entry.file_type().is_file() {
331 add_file(
332 path,
333 entry.path(),
334 &store,
335 &mut index,
336 &compression,
337 &chunker_mode,
338 cipher.as_ref(),
339 json,
340 )?;
341 }
342 }
343 } else {
344 add_file(
345 path,
346 file_path,
347 &store,
348 &mut index,
349 &compression,
350 &chunker_mode,
351 cipher.as_ref(),
352 json,
353 )?;
354 }
355
356 index.save(&shard_dir.join("index"), &fmt)?;
357 if json {
358 info!(
359 "{}",
360 serde_json::to_string(&serde_json::json!({"status": "added"}))?
361 );
362 }
363 Ok(())
364}
365
366pub fn commit(path: &Path, message: &str, author: &str, json: bool) -> Result<()> {
367 let shard_dir = path.join(".shard");
368 if !shard_dir.exists() {
369 anyhow::bail!("not a shard repository (run `shard init` first)");
370 }
371
372 wal::recover(&shard_dir)?;
374
375 let config = load_config(&shard_dir)?;
376 let fmt = MetadataFormat::from_config(&config);
377
378 let store = Store::open(&shard_dir)?;
379 let mut index = Index::load(&shard_dir.join("index"), &fmt)?;
380
381 if index.files.is_empty() {
382 anyhow::bail!("nothing to commit (stage files with `shard add` first)");
383 }
384
385 let head_path = shard_dir.join("HEAD");
386
387 let wal = wal::Wal::new(&shard_dir);
389 let head_backup = fs::read_to_string(&head_path).ok();
390 let index_backup = fs::read(shard_dir.join("index"))?;
391 wal.append(&wal::WalEntry::CommitBegin {
392 head_backup,
393 index_backup,
394 })?;
395
396 let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
398 let keys = load_keypair(&shard_dir)?;
399 let signing_key = keys.signing_key;
400 let mut manifest_ids = Vec::new();
401 for manifest in index.files.values_mut() {
402 manifest.created_by = Some(author.to_string());
403 manifest.created_at = Some(timestamp);
404
405 let mut unsigned = manifest.clone();
406 unsigned.signature = None;
407 let canonical = metadata::serialize_for_signing(&unsigned);
408 let sig = signing_key.sign(&canonical);
409 manifest.signature = Some(hex::encode(sig.to_bytes()));
410
411 let encoded = metadata::serialize(manifest, &fmt);
412 let hash = blake3::hash(&encoded);
413 let chunk = crate::chunker::Chunk {
414 hash,
415 data: encoded,
416 offset: 0,
417 };
418 store.put_chunk(&chunk)?;
419 manifest_ids.push(hash.to_hex().to_string());
420 }
421 manifest_ids.sort();
422
423 let mut parents = Vec::new();
425 let (current_branch, parent_id) = branch::resolve_head(&shard_dir)?;
426 if let Some(pid) = parent_id {
427 parents.push(pid);
428 }
429
430 let public_key_hex = hex::encode(keys.verifying_key.to_bytes());
432 let key_id = keychain::get_current_key_id(&shard_dir.join("keys")).ok();
433 let mut commit = Commit {
434 commit_id: String::new(),
435 parents,
436 manifests: manifest_ids,
437 author: author.to_string(),
438 message: message.to_string(),
439 timestamp,
440 public_key: Some(public_key_hex),
441 signature: None,
442 key_id,
443 };
444
445 let json_unsigned = metadata::serialize_for_signing(&commit);
447 let signature = signing_key.sign(&json_unsigned);
448 commit.signature = Some(hex::encode(signature.to_bytes()));
449
450 let encoded = metadata::serialize(&commit, &fmt);
452 let hash = blake3::hash(&encoded);
453 let chunk = crate::chunker::Chunk {
454 hash,
455 data: encoded,
456 offset: 0,
457 };
458 store.put_chunk(&chunk)?;
459
460 let commit_id = hash.to_hex().to_string();
462 if has_dag_cycle(&store, &commit.parents, &commit_id)? {
463 anyhow::bail!(
464 "Cycle detected: commit {} is already an ancestor of one or more parents",
465 commit_id
466 );
467 }
468
469 if let Some(ref branch_name) = current_branch {
471 branch::update_branch_ref(&shard_dir, branch_name, &commit_id)?;
472 branch::set_head_branch(&shard_dir, branch_name)?;
473 } else {
474 fs::write(&head_path, &commit_id)?;
475 }
476
477 index.files.clear();
479 index.save(&shard_dir.join("index"), &fmt)?;
480
481 wal.append(&wal::WalEntry::CommitEnd)?;
483 wal.truncate()?;
484
485 if json {
486 info!(
487 "{}",
488 serde_json::to_string(&serde_json::json!({
489 "commit_id": commit_id,
490 "message": message,
491 }))?
492 );
493 } else {
494 info!("Committed {} ({})", commit_id, message);
495 }
496 Ok(())
497}
498
499pub fn verify(path: &Path, commit_id: &str, json: bool) -> Result<()> {
500 let shard_dir = path.join(".shard");
501 if !shard_dir.exists() {
502 anyhow::bail!("not a shard repository (run `shard init` first)");
503 }
504
505 if commit_id.len() < 2 {
506 anyhow::bail!("invalid commit id (too short: need at least 2 characters)");
507 }
508 let store = Store::open(&shard_dir)?;
509 let cipher = maybe_load_cipher(&shard_dir)?;
510 let commit_data = store.get_chunk(commit_id)?;
511
512 let stored_hash = blake3::hash(&commit_data);
514 if stored_hash.to_hex().to_string() != commit_id {
515 anyhow::bail!("commit object hash mismatch: stored content does not match its hash — data may be corrupted");
516 }
517
518 let commit: Commit = metadata::deserialize(&commit_data)?;
519
520 let mut sig_verified = false;
521 let mut files_checked = 0u64;
522
523 if let Some(kid) = &commit.key_id {
525 if let Err(e) = keychain::key_was_valid_at(&shard_dir.join("keys"), kid, commit.timestamp) {
526 anyhow::bail!("Keychain verification failed: {}", e);
527 } else if !json {
528 info!("Keychain: key {} was active at commit time.", kid);
529 }
530 }
531
532 if let Some(sig_hex) = &commit.signature {
533 let verifying_key = if let Some(pk_hex) = &commit.public_key {
534 let pk_bytes = hex::decode(pk_hex)?;
535 ed25519_dalek::VerifyingKey::from_bytes(pk_bytes.as_slice().try_into()?)?
536 } else {
537 let pub_bytes = load_public_key(&shard_dir)?;
538 ed25519_dalek::VerifyingKey::from_bytes(pub_bytes.as_slice().try_into()?)?
539 };
540
541 let mut unsigned_commit = commit.clone();
542 unsigned_commit.signature = None;
543 let json_unsigned = metadata::serialize_for_signing(&unsigned_commit);
544
545 let sig_bytes = hex::decode(sig_hex)?;
546 let signature = ed25519_dalek::Signature::from_bytes(sig_bytes.as_slice().try_into()?);
547
548 verifying_key.verify(&json_unsigned, &signature)?;
549 sig_verified = true;
550 if !json {
551 info!("Signature verified.");
552 }
553 } else if !json {
554 info!("Warning: Commit is unsigned.");
555 }
556
557 for manifest_id in &commit.manifests {
558 let manifest_data = store.get_chunk(manifest_id)?;
559 let hash = blake3::hash(&manifest_data);
560 if hash.to_hex().to_string() != *manifest_id {
561 anyhow::bail!("manifest object hash mismatch for manifest '{}': content does not match stored hash. The object store may be corrupted.", manifest_id);
562 }
563
564 let manifest: FileManifest = metadata::deserialize(&manifest_data)?;
565
566 if let Some(sig_hex) = &manifest.signature {
568 let pk_bytes = if let Some(pk_hex) = &commit.public_key {
569 hex::decode(pk_hex)?
570 } else {
571 load_public_key(&shard_dir)?
572 };
573 let vk = ed25519_dalek::VerifyingKey::from_bytes(pk_bytes.as_slice().try_into()?)?;
574 let mut unsigned = manifest.clone();
575 unsigned.signature = None;
576 let canonical = metadata::serialize_for_signing(&unsigned);
577 let sig_bytes = hex::decode(sig_hex)?;
578 let sig = ed25519_dalek::Signature::from_bytes(sig_bytes.as_slice().try_into()?);
579 vk.verify(&canonical, &sig)?;
580 if !json {
581 info!(" Manifest signature verified for: {}", manifest.name);
582 }
583 }
584
585 let compression = manifest.compression.parse::<Compression>()?;
586 if !json {
587 info!(
588 "Verifying file: {} (compression: {})",
589 manifest.name, manifest.compression
590 );
591 }
592
593 if let Some(ref mr) = manifest.merkle_root {
595 let computed = FileManifest::merkle_root(&manifest.chunks);
596 if mr != &computed {
597 anyhow::bail!(
598 "merkle root mismatch for '{}': manifest says {} but computed {}",
599 manifest.name,
600 mr,
601 computed
602 );
603 }
604 }
605
606 for chunk_id in &manifest.chunks {
607 let chunk_data = store.get_chunk(chunk_id)?;
608 let decrypted = match &cipher {
609 Some(c) => c.decrypt(&chunk_data)?,
610 None => chunk_data,
611 };
612 let decompressed = compression.decompress(&decrypted)?;
613 let hash = blake3::hash(&decompressed);
614 if hash.to_hex().to_string() != *chunk_id {
615 anyhow::bail!("chunk hash mismatch for '{}': content does not match stored hash (expected {}, got {}). File may be corrupted.", manifest.name, chunk_id, hash.to_hex());
616 }
617 }
618 files_checked += 1;
619 }
620
621 if json {
622 info!(
623 "{}",
624 serde_json::to_string(&serde_json::json!({
625 "commit_id": commit_id,
626 "verified": true,
627 "signature_verified": sig_verified,
628 "files_checked": files_checked,
629 }))?
630 );
631 } else {
632 info!("Verification successful.");
633 }
634 Ok(())
635}
636
637fn load_commit(store: &Store, commit_id: &str) -> Result<Commit> {
638 if commit_id.len() < 2 {
639 anyhow::bail!(
640 "commit id too short (got {} chars, need at least 2): '{}'",
641 commit_id.len(),
642 commit_id
643 );
644 };
645 let data = store.get_chunk(commit_id)?;
646 let mut commit: Commit = metadata::deserialize(&data)?;
647 commit.commit_id = commit_id.to_string();
648 Ok(commit)
649}
650
651fn has_dag_cycle(store: &Store, parents: &[String], commit_id: &str) -> Result<bool> {
652 let mut seen = std::collections::HashSet::new();
653 let mut stack: Vec<String> = parents.to_vec();
654 while let Some(cid) = stack.pop() {
655 if cid == commit_id {
656 return Ok(true);
657 }
658 if !seen.insert(cid.clone()) {
659 continue;
660 }
661 if let Ok(data) = store.get_chunk(&cid) {
662 if let Ok(commit) = metadata::deserialize::<Commit>(&data) {
663 for p in &commit.parents {
664 stack.push(p.clone());
665 }
666 }
667 }
668 }
669 Ok(false)
670}
671
672#[derive(Serialize)]
673pub struct LogEntry {
674 pub commit_id: String,
675 pub parents: Vec<String>,
676 pub manifests: Vec<String>,
677 pub author: String,
678 pub message: String,
679 pub timestamp: u64,
680 pub signature: Option<String>,
681}
682
683impl From<Commit> for LogEntry {
684 fn from(c: Commit) -> Self {
685 LogEntry {
686 commit_id: c.commit_id,
687 parents: c.parents,
688 manifests: c.manifests,
689 author: c.author,
690 message: c.message,
691 timestamp: c.timestamp,
692 signature: c.signature,
693 }
694 }
695}
696
697pub fn log_cmd(path: &Path, json: bool) -> Result<()> {
698 let shard_dir = path.join(".shard");
699 if !shard_dir.exists() {
700 anyhow::bail!("not a shard repository (run `shard init` first)");
701 }
702
703 let store = Store::open(&shard_dir)?;
704
705 let (_, head_commit) = branch::resolve_head(&shard_dir)?;
706 let head = head_commit
707 .ok_or_else(|| anyhow::anyhow!("no commits yet (run `shard commit` after adding files)"))?;
708
709 let mut entries: Vec<LogEntry> = Vec::new();
710 let mut seen = std::collections::HashSet::new();
711 let mut stack = vec![head];
712
713 while let Some(cid) = stack.pop() {
714 if !seen.insert(cid.clone()) {
715 continue;
716 }
717 let commit = load_commit(&store, &cid)?;
718 for parent in &commit.parents {
719 stack.push(parent.clone());
720 }
721 entries.push(commit.into());
722 }
723
724 if json {
725 info!("{}", serde_json::to_string_pretty(&entries)?);
726 } else {
727 for entry in &entries {
728 let ts = {
729 let secs = entry.timestamp as i64;
730 let tm = time::OffsetDateTime::from_unix_timestamp(secs)
731 .unwrap_or(time::OffsetDateTime::UNIX_EPOCH);
732 tm.format(&time::format_description::well_known::Rfc3339)
733 .unwrap_or_else(|_| entry.timestamp.to_string())
734 };
735 info!("commit {}", entry.commit_id);
736 if !entry.parents.is_empty() {
737 info!("parents: {}", entry.parents.join(" "));
738 }
739 info!("author: {}", entry.author);
740 info!("date: {}", ts);
741 info!("");
742 for line in entry.message.lines() {
743 info!(" {}", line);
744 }
745 info!("");
746 }
747 }
748
749 Ok(())
750}
751
752fn reconstruct_file(
753 store: &Store,
754 manifest: &FileManifest,
755 cipher: Option<&encryption::RepoCipher>,
756) -> Result<Vec<u8>> {
757 let compression: Compression = manifest.compression.parse()?;
758 let mut data = Vec::new();
759 for chunk_id in &manifest.chunks {
760 let chunk_data = store.get_chunk(chunk_id)?;
761 let decrypted = match cipher {
762 Some(c) => c.decrypt(&chunk_data)?,
763 None => chunk_data,
764 };
765 let decompressed = compression.decompress(&decrypted)?;
766 data.extend_from_slice(&decompressed);
767 }
768 Ok(data)
769}
770
771pub fn diff(path: &Path, commit_a: &str, commit_b: Option<&str>, json: bool) -> Result<()> {
772 let shard_dir = path.join(".shard");
773 if !shard_dir.exists() {
774 anyhow::bail!("not a shard repository (run `shard init` first)");
775 }
776
777 let store = Store::open(&shard_dir)?;
778 let cipher = maybe_load_cipher(&shard_dir)?;
779
780 let cid_b = match commit_b {
781 Some(c) => branch::resolve_rev(&shard_dir, c)?,
782 None => {
783 let (_, head) = branch::resolve_head(&shard_dir)?;
784 head.ok_or_else(|| anyhow::anyhow!("no commits yet"))?
785 }
786 };
787 let cid_a = branch::resolve_rev(&shard_dir, commit_a)?;
788
789 let commit1 = load_commit(&store, &cid_a)?;
790 let commit2 = load_commit(&store, &cid_b)?;
791
792 let mut files1: HashMap<String, FileManifest> = HashMap::new();
793 for mid in &commit1.manifests {
794 let data = store.get_chunk(mid)?;
795 let m: FileManifest = metadata::deserialize(&data)?;
796 files1.insert(m.name.clone(), m);
797 }
798
799 let mut files2: HashMap<String, FileManifest> = HashMap::new();
800 for mid in &commit2.manifests {
801 let data = store.get_chunk(mid)?;
802 let m: FileManifest = metadata::deserialize(&data)?;
803 files2.insert(m.name.clone(), m);
804 }
805
806 let mut all_names: Vec<&String> = files1.keys().chain(files2.keys()).collect();
807 all_names.sort();
808 all_names.dedup();
809
810 let mut changes: Vec<serde_json::Value> = Vec::new();
811 let mut diff_found = false;
812
813 for name in all_names {
814 match (files1.get(name), files2.get(name)) {
815 (None, Some(manifest)) => {
816 let content = reconstruct_file(&store, manifest, cipher.as_ref())?;
817 let text = String::from_utf8_lossy(&content);
818 diff_found = true;
819 if json {
820 changes.push(serde_json::json!({
821 "type": "added",
822 "file": name,
823 "lines": text.lines().collect::<Vec<_>>(),
824 }));
825 } else {
826 info!("--- /dev/null");
827 info!("+++ b/{}", name);
828 let lines: Vec<&str> = text.lines().collect();
829 info!("@@ -0,0 +1,{} @@", lines.len());
830 for line in &lines {
831 info!("+{}", line);
832 }
833 }
834 }
835 (Some(manifest), None) => {
836 let content = reconstruct_file(&store, manifest, cipher.as_ref())?;
837 let text = String::from_utf8_lossy(&content);
838 diff_found = true;
839 if json {
840 changes.push(serde_json::json!({
841 "type": "removed",
842 "file": name,
843 "lines": text.lines().collect::<Vec<_>>(),
844 }));
845 } else {
846 info!("--- a/{}", name);
847 info!("+++ /dev/null");
848 let lines: Vec<&str> = text.lines().collect();
849 info!("@@ -1,{} +0,0 @@", lines.len());
850 for line in &lines {
851 info!("-{}", line);
852 }
853 }
854 }
855 (Some(ma), Some(mb)) => {
856 if ma.chunks == mb.chunks {
857 continue;
858 }
859 let content_a = reconstruct_file(&store, ma, cipher.as_ref())?;
860 let content_b = reconstruct_file(&store, mb, cipher.as_ref())?;
861 diff_found = true;
862 if json {
863 let text_a = String::from_utf8_lossy(&content_a);
864 let text_b = String::from_utf8_lossy(&content_b);
865 changes.push(serde_json::json!({
866 "type": "modified",
867 "file": name,
868 "old_lines": text_a.lines().collect::<Vec<_>>(),
869 "new_lines": text_b.lines().collect::<Vec<_>>(),
870 }));
871 } else {
872 let text_a = String::from_utf8_lossy(&content_a);
873 let text_b = String::from_utf8_lossy(&content_b);
874 let diff = TextDiff::from_lines(text_a.as_ref(), text_b.as_ref());
875 let mut buf: Vec<u8> = Vec::new();
876 diff.unified_diff()
877 .header(&format!("a/{}", name), &format!("b/{}", name))
878 .to_writer(&mut buf)
879 .map_err(|e| anyhow::anyhow!("diff output error: {}", e))?;
880 let output = String::from_utf8_lossy(&buf);
881 for line in output.lines() {
882 info!("{}", line);
883 }
884 }
885 }
886 (None, None) => {}
887 }
888 }
889
890 if !diff_found {
891 if json {
892 info!(
893 "{}",
894 serde_json::to_string(
895 &serde_json::json!({"changes": changes, "message": "no differences"})
896 )?
897 );
898 } else {
899 info!("No differences between the commits.");
900 }
901 return Ok(());
902 }
903
904 if json {
905 info!(
906 "{}",
907 serde_json::to_string(&serde_json::json!({"changes": changes}))?
908 );
909 }
910
911 Ok(())
912}
913
914pub fn checkout(path: &Path, target: &str, json: bool) -> Result<()> {
915 let shard_dir = path.join(".shard");
916 if !shard_dir.exists() {
917 anyhow::bail!("not a shard repository (run `shard init` first)");
918 }
919
920 let store = Store::open(&shard_dir)?;
921 let cipher = maybe_load_cipher(&shard_dir)?;
922
923 let branch_path = shard_dir.join("refs").join("heads").join(target);
926 let commit_id = if branch_path.exists() {
927 fs::read_to_string(&branch_path)?.trim().to_string()
928 } else {
929 target.to_string()
930 };
931
932 let commit = load_commit(&store, &commit_id)?;
934
935 if branch_path.exists() {
937 branch::set_head_branch(&shard_dir, target)?;
938 } else {
939 branch::set_head_commit(&shard_dir, target)?;
940 }
941 let mut files = Vec::new();
942
943 for manifest_id in &commit.manifests {
944 let data = store.get_chunk(manifest_id)?;
945 let hash = blake3::hash(&data);
946 if hash.to_hex().to_string() != *manifest_id {
947 anyhow::bail!("Manifest hash mismatch: {}", manifest_id);
948 }
949 let manifest: FileManifest = metadata::deserialize(&data)?;
950 let compression = manifest.compression.parse::<Compression>()?;
951 if !json {
952 info!(
953 "Checking out file: {} (compression: {})",
954 manifest.name, manifest.compression
955 );
956 }
957
958 let mut file_data = Vec::new();
959 for chunk_id in &manifest.chunks {
960 let chunk_data = store.get_chunk(chunk_id)?;
961 let decrypted = match &cipher {
962 Some(c) => c.decrypt(&chunk_data)?,
963 None => chunk_data,
964 };
965 let decompressed = compression.decompress(&decrypted)?;
966 file_data.extend_from_slice(&decompressed);
967 }
968 fs::write(path.join(&manifest.name), file_data)?;
969 if !json {
970 info!(" -> {}", manifest.name);
971 }
972 files.push(manifest.name);
973 }
974
975 if json {
976 info!(
977 "{}",
978 serde_json::to_string(&serde_json::json!({
979 "commit_id": commit_id,
980 "files": files,
981 }))?
982 );
983 } else {
984 info!("Checkout complete.");
985 }
986 Ok(())
987}
988
989pub fn status(path: &Path, json: bool) -> Result<()> {
990 let shard_dir = path.join(".shard");
991 if !shard_dir.exists() {
992 anyhow::bail!("not a shard repository (run `shard init` first)");
993 }
994
995 let config = load_config(&shard_dir)?;
996 let fmt = MetadataFormat::from_config(&config);
997
998 let (current_branch, head_commit) = branch::resolve_head(&shard_dir)?;
999 let mut commit_id: Option<String> = None;
1000 if let Some(cid) = head_commit {
1001 commit_id = Some(cid);
1002 if !json {
1003 if let Some(ref branch) = current_branch {
1004 info!("On branch: {}", branch);
1005 } else {
1006 info!("HEAD detached at {}", commit_id.as_ref().unwrap());
1007 }
1008 }
1009 } else if !json {
1010 info!("No commits yet.");
1011 }
1012
1013 let index = Index::load(&shard_dir.join("index"), &fmt)?;
1014 let staged: Vec<String> = index.files.keys().cloned().collect();
1015 if !json {
1016 if staged.is_empty() {
1017 info!("Nothing staged.");
1018 } else {
1019 info!("\nStaged files:");
1020 for name in &staged {
1021 info!(" {} (to be committed)", name);
1022 }
1023 }
1024 }
1025
1026 let store = Store::open(&shard_dir)?;
1027 let mut deleted = Vec::new();
1028 let tracked_names: std::collections::HashSet<String> = if let Some(head) = &commit_id {
1029 let mut names = std::collections::HashSet::new();
1030 if let Ok(commit) = load_commit(&store, head) {
1031 for manifest_id in &commit.manifests {
1032 if let Ok(data) = store.get_chunk(manifest_id) {
1033 if let Ok(manifest) = metadata::deserialize::<FileManifest>(&data) {
1034 let file_path = path.join(&manifest.name);
1035 if !file_path.exists() {
1036 deleted.push(manifest.name.clone());
1037 }
1038 names.insert(manifest.name);
1039 }
1040 }
1041 }
1042 }
1043 names
1044 } else {
1045 std::collections::HashSet::new()
1046 };
1047
1048 if !json && !deleted.is_empty() {
1049 info!("\nDeleted files:");
1050 for name in &deleted {
1051 info!(" {} (deleted)", name);
1052 }
1053 }
1054
1055 let mut untracked = Vec::new();
1056 let ignore_patterns = load_shardignore(path);
1057 for entry in walkdir::WalkDir::new(path)
1058 .min_depth(1)
1059 .into_iter()
1060 .filter_map(|e| e.ok())
1061 {
1062 if entry.file_type().is_file() || entry.file_type().is_dir() {
1063 let rel_path = entry.path().strip_prefix(path).unwrap_or(entry.path());
1064 let name = rel_path.to_string_lossy().to_string();
1065 let is_hidden = rel_path.components().any(|c| {
1066 let name = c.as_os_str().to_string_lossy();
1067 name == ".shard" || name == ".git"
1068 });
1069 if !is_hidden
1070 && entry.path() != shard_dir
1071 && !entry.path().starts_with(&shard_dir)
1072 && !index.files.contains_key(&name)
1073 && !tracked_names.contains(&name)
1074 && entry.file_type().is_file()
1075 && !matches_ignore(rel_path, &ignore_patterns)
1076 {
1077 untracked.push(name);
1078 }
1079 }
1080 }
1081 if !json && !untracked.is_empty() {
1082 info!("\nUntracked files:");
1083 for name in &untracked {
1084 info!(" {}", name);
1085 }
1086 }
1087
1088 if json {
1089 info!(
1090 "{}",
1091 serde_json::to_string(&serde_json::json!({
1092 "commit": commit_id,
1093 "staged": staged,
1094 "deleted": deleted,
1095 "untracked": untracked,
1096 }))?
1097 );
1098 }
1099
1100 Ok(())
1101}
1102
1103fn load_config(shard_dir: &Path) -> Result<std::collections::BTreeMap<String, String>> {
1104 let config_path = shard_dir.join("config.json");
1105 if config_path.exists() {
1106 let data = fs::read(&config_path)?;
1107 Ok(metadata::deserialize(&data)?)
1108 } else {
1109 Ok(std::collections::BTreeMap::new())
1110 }
1111}
1112
1113fn save_config(
1114 shard_dir: &Path,
1115 config: &std::collections::BTreeMap<String, String>,
1116) -> Result<()> {
1117 let fmt = MetadataFormat::from_config(config);
1118 let data = metadata::serialize(config, &fmt);
1119 fs::write(shard_dir.join("config.json"), data)?;
1120 Ok(())
1121}
1122
1123fn global_keys_dir() -> Option<PathBuf> {
1124 std::env::var("HOME")
1125 .ok()
1126 .map(|h| Path::new(&h).join(".shard").join("keys"))
1127}
1128
1129fn load_keypair(shard_dir: &Path) -> Result<KeyPair> {
1130 let local = shard_dir.join("keys");
1131 if local.join("secret.key").exists() {
1132 return KeyPair::load(&local);
1133 }
1134 if let Some(global) = global_keys_dir() {
1135 if global.join("secret.key").exists() {
1136 return KeyPair::load(&global);
1137 }
1138 }
1139 anyhow::bail!("no keypair found in {} or ~/.shard/keys/", local.display())
1140}
1141
1142fn load_public_key(shard_dir: &Path) -> Result<Vec<u8>> {
1143 let local = shard_dir.join("keys/public.key");
1144 if local.exists() {
1145 return Ok(fs::read(&local)?);
1146 }
1147 if let Some(global) = global_keys_dir() {
1148 let gp = global.join("public.key");
1149 if gp.exists() {
1150 return Ok(fs::read(&gp)?);
1151 }
1152 }
1153 anyhow::bail!(
1154 "no public key found in {} or ~/.shard/keys/",
1155 local.display()
1156 )
1157}
1158
1159fn maybe_load_cipher(shard_dir: &Path) -> Result<Option<encryption::RepoCipher>> {
1160 let config = load_config(shard_dir)?;
1161 if config.get("private").map(|s| s.as_str()) == Some("true") {
1162 let key = encryption::load_repo_key(&shard_dir.join("keys"))?;
1163 Ok(Some(encryption::RepoCipher::from_key(&key)))
1164 } else {
1165 Ok(None)
1166 }
1167}
1168
1169pub fn config_get(path: &Path, key: Option<&str>) -> Result<()> {
1170 let shard_dir = path.join(".shard");
1171 if !shard_dir.exists() {
1172 anyhow::bail!("not a shard repository (run `shard init` first)");
1173 }
1174 let config = load_config(&shard_dir)?;
1175 if let Some(key) = key {
1176 match config.get(key) {
1177 Some(value) => info!("{} = {}", key, value),
1178 None => anyhow::bail!("config key not found: {}", key),
1179 }
1180 } else {
1181 for (k, v) in &config {
1182 info!("{} = {}", k, v);
1183 }
1184 }
1185 Ok(())
1186}
1187
1188pub fn config_set(path: &Path, key: &str, value: &str) -> Result<()> {
1189 let shard_dir = path.join(".shard");
1190 if !shard_dir.exists() {
1191 anyhow::bail!("not a shard repository (run `shard init` first)");
1192 }
1193 let mut config = load_config(&shard_dir)?;
1194 config.insert(key.to_string(), value.to_string());
1195 save_config(&shard_dir, &config)?;
1196 info!("{} = {}", key, value);
1197 Ok(())
1198}
1199
1200fn load_tags(shard_dir: &Path) -> Result<std::collections::BTreeMap<String, String>> {
1201 let tags_path = shard_dir.join("tags.json");
1202 if tags_path.exists() {
1203 let data = fs::read(&tags_path)?;
1204 Ok(serde_json::from_slice(&data)?)
1205 } else {
1206 Ok(std::collections::BTreeMap::new())
1207 }
1208}
1209
1210fn save_tags(shard_dir: &Path, tags: &std::collections::BTreeMap<String, String>) -> Result<()> {
1211 let data = serde_json::to_string_pretty(tags)?;
1212 fs::write(shard_dir.join("tags.json"), data)?;
1213 Ok(())
1214}
1215
1216pub fn tag_add(path: &Path, name: &str, commit_id: &str) -> Result<()> {
1217 let shard_dir = path.join(".shard");
1218 if !shard_dir.exists() {
1219 anyhow::bail!("not a shard repository (run `shard init` first)");
1220 }
1221 let store = Store::open(&shard_dir)?;
1223 load_commit(&store, commit_id)?;
1224 let mut tags = load_tags(&shard_dir)?;
1225 tags.insert(name.to_string(), commit_id.to_string());
1226 save_tags(&shard_dir, &tags)?;
1227 info!("Tagged '{}' -> {}", name, commit_id);
1228 Ok(())
1229}
1230
1231pub fn branch_create(path: &Path, name: &str, commit_id: Option<&str>) -> Result<()> {
1232 let shard_dir = path.join(".shard");
1233 if !shard_dir.exists() {
1234 anyhow::bail!("not a shard repository (run `shard init` first)");
1235 }
1236 let id = match commit_id {
1237 Some(cid) => cid.to_string(),
1238 None => {
1239 let (_, head) = branch::resolve_head(&shard_dir)?;
1240 head.ok_or_else(|| anyhow::anyhow!("No commits yet — cannot create branch"))?
1241 }
1242 };
1243 let store = Store::open(&shard_dir)?;
1245 load_commit(&store, &id)?;
1246 branch::create_branch(&shard_dir, name, &id)
1247}
1248
1249pub fn branch_delete(path: &Path, name: &str) -> Result<()> {
1250 let shard_dir = path.join(".shard");
1251 if !shard_dir.exists() {
1252 anyhow::bail!("not a shard repository (run `shard init` first)");
1253 }
1254 branch::delete_branch(&shard_dir, name)
1255}
1256
1257pub fn branch_list(path: &Path) -> Result<()> {
1258 let shard_dir = path.join(".shard");
1259 if !shard_dir.exists() {
1260 anyhow::bail!("not a shard repository (run `shard init` first)");
1261 }
1262 let (current, branches) = branch::list_branches(&shard_dir)?;
1263 if branches.is_empty() {
1264 info!("No branches.");
1265 return Ok(());
1266 }
1267 for (name, commit_id) in &branches {
1268 let prefix = if current.as_deref() == Some(name) {
1269 "* "
1270 } else {
1271 " "
1272 };
1273 info!(
1274 "{}{} ({})",
1275 prefix,
1276 name,
1277 &commit_id[..8.min(commit_id.len())]
1278 );
1279 }
1280 Ok(())
1281}
1282
1283pub fn merge(path: &Path, branch: &str, message: &str, author: &str, json: bool) -> Result<()> {
1284 let shard_dir = path.join(".shard");
1285 if !shard_dir.exists() {
1286 anyhow::bail!("not a shard repository (run `shard init` first)");
1287 }
1288
1289 let store = Store::open(&shard_dir)?;
1290
1291 let config = load_config(&shard_dir)?;
1292 let fmt = MetadataFormat::from_config(&config);
1293
1294 let (current_branch, current_id) = branch::resolve_head(&shard_dir)?;
1296 let current_id =
1297 current_id.ok_or_else(|| anyhow::anyhow!("No commits yet — nothing to merge into"))?;
1298
1299 let source_id = branch::resolve_rev(&shard_dir, branch)?;
1301 if source_id == current_id {
1302 anyhow::bail!("Already up to date — source is the same commit as HEAD");
1303 }
1304
1305 let current_commit = load_commit(&store, ¤t_id)?;
1307 let source_commit = load_commit(&store, &source_id)?;
1308
1309 let mut merged_manifests: std::collections::HashMap<String, (String, Vec<String>)> =
1311 std::collections::HashMap::new();
1312
1313 for manifest_id in ¤t_commit.manifests {
1314 let data = store.get_chunk(manifest_id)?;
1315 let manifest: FileManifest = metadata::deserialize(&data)?;
1316 merged_manifests.insert(manifest.name.clone(), (manifest.name, manifest.chunks));
1317 }
1318
1319 for manifest_id in &source_commit.manifests {
1320 let data = store.get_chunk(manifest_id)?;
1321 let manifest: FileManifest = metadata::deserialize(&data)?;
1322 merged_manifests.insert(manifest.name.clone(), (manifest.name, manifest.chunks));
1323 }
1324
1325 let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
1327 let keys = load_keypair(&shard_dir)?;
1328 let signing_key = keys.signing_key;
1329 let mut merged_manifest_ids = Vec::new();
1330 for (name, chunks) in merged_manifests.values() {
1331 let compression = Compression::None;
1332 let mut manifest = FileManifest {
1333 name: name.clone(),
1334 size: 0,
1335 chunks: chunks.clone(),
1336 content_type: None,
1337 compression: compression.as_str().to_string(),
1338 merkle_root: Some(FileManifest::merkle_root(chunks)),
1339 created_by: Some(author.to_string()),
1340 created_at: Some(timestamp),
1341 signature: None,
1342 };
1343
1344 let mut unsigned = manifest.clone();
1345 unsigned.signature = None;
1346 let canonical = metadata::serialize_for_signing(&unsigned);
1347 let sig = signing_key.sign(&canonical);
1348 manifest.signature = Some(hex::encode(sig.to_bytes()));
1349
1350 let encoded = metadata::serialize(&manifest, &fmt);
1351 let hash = blake3::hash(&encoded);
1352 store.put_chunk(&crate::chunker::Chunk {
1353 hash,
1354 data: encoded,
1355 offset: 0,
1356 })?;
1357 merged_manifest_ids.push(hash.to_hex().to_string());
1358 }
1359 merged_manifest_ids.sort();
1360
1361 let public_key_hex = hex::encode(keys.verifying_key.to_bytes());
1363 let key_id = keychain::get_current_key_id(&shard_dir.join("keys")).ok();
1364 let parents = vec![current_id.clone(), source_id.clone()];
1365 let mut commit = Commit {
1366 commit_id: String::new(),
1367 parents,
1368 manifests: merged_manifest_ids,
1369 author: author.to_string(),
1370 message: message.to_string(),
1371 timestamp,
1372 public_key: Some(public_key_hex),
1373 signature: None,
1374 key_id,
1375 };
1376
1377 let json_unsigned = metadata::serialize_for_signing(&commit);
1378 let signature = signing_key.sign(&json_unsigned);
1379 commit.signature = Some(hex::encode(signature.to_bytes()));
1380
1381 let encoded = metadata::serialize(&commit, &fmt);
1382 let hash = blake3::hash(&encoded);
1383 store.put_chunk(&crate::chunker::Chunk {
1384 hash,
1385 data: encoded,
1386 offset: 0,
1387 })?;
1388
1389 let merge_commit_id = hash.to_hex().to_string();
1390
1391 if has_dag_cycle(&store, &commit.parents, &merge_commit_id)? {
1393 anyhow::bail!(
1394 "Cycle detected in merge: commit {} is already an ancestor of one or more parents",
1395 merge_commit_id
1396 );
1397 }
1398
1399 if let Some(ref branch_name) = current_branch {
1401 branch::update_branch_ref(&shard_dir, branch_name, &merge_commit_id)?;
1402 branch::set_head_branch(&shard_dir, branch_name)?;
1403 } else {
1404 branch::set_head_commit(&shard_dir, &merge_commit_id)?;
1405 }
1406
1407 if json {
1408 info!(
1409 "{}",
1410 serde_json::to_string(&serde_json::json!({
1411 "commit_id": merge_commit_id,
1412 "message": message,
1413 }))?
1414 );
1415 } else {
1416 info!("Merge commit {} ({})", merge_commit_id, message);
1417 }
1418 Ok(())
1419}
1420
1421pub fn tag_list(path: &Path) -> Result<()> {
1422 let shard_dir = path.join(".shard");
1423 if !shard_dir.exists() {
1424 anyhow::bail!("not a shard repository (run `shard init` first)");
1425 }
1426 let tags = load_tags(&shard_dir)?;
1427 if tags.is_empty() {
1428 info!("No tags.");
1429 } else {
1430 for (name, commit_id) in &tags {
1431 info!("{} -> {}", name, commit_id);
1432 }
1433 }
1434 Ok(())
1435}
1436
1437fn collect_reachable(
1438 store: &Store,
1439 commit_id: &str,
1440 seen_commits: &mut std::collections::HashSet<String>,
1441 reachable: &mut std::collections::HashSet<String>,
1442) -> Result<()> {
1443 if !seen_commits.insert(commit_id.to_string()) {
1444 return Ok(());
1445 }
1446
1447 reachable.insert(commit_id.to_string());
1448
1449 let commit = match load_commit(store, commit_id) {
1450 Ok(c) => c,
1451 Err(_) => return Ok(()),
1452 };
1453
1454 for manifest_id in &commit.manifests {
1455 reachable.insert(manifest_id.clone());
1456
1457 if let Ok(data) = store.get_chunk(manifest_id) {
1458 if let Ok(manifest) = metadata::deserialize::<FileManifest>(&data) {
1459 for chunk_id in &manifest.chunks {
1460 reachable.insert(chunk_id.clone());
1461 }
1462 }
1463 }
1464 }
1465
1466 for parent_id in &commit.parents {
1467 collect_reachable(store, parent_id, seen_commits, reachable)?;
1468 }
1469
1470 Ok(())
1471}
1472
1473pub fn prune(path: &Path, json: bool) -> Result<()> {
1474 let shard_dir = path.join(".shard");
1475 if !shard_dir.exists() {
1476 anyhow::bail!("not a shard repository (run `shard init` first)");
1477 }
1478
1479 let config = load_config(&shard_dir)?;
1480 let fmt = MetadataFormat::from_config(&config);
1481
1482 let store = Store::open(&shard_dir)?;
1483 let mut reachable: std::collections::HashSet<String> = std::collections::HashSet::new();
1484
1485 let (_, head_commit) = branch::resolve_head(&shard_dir)?;
1487 if let Some(ref head) = head_commit {
1488 collect_reachable(
1489 &store,
1490 head,
1491 &mut std::collections::HashSet::new(),
1492 &mut reachable,
1493 )?;
1494 }
1495
1496 if let Ok(branches) = branch::list_branches(&shard_dir) {
1498 for (_, commit_id) in branches.1 {
1499 collect_reachable(
1500 &store,
1501 &commit_id,
1502 &mut std::collections::HashSet::new(),
1503 &mut reachable,
1504 )?;
1505 }
1506 }
1507
1508 let tags = load_tags(&shard_dir)?;
1510 for commit_id in tags.values() {
1511 collect_reachable(
1512 &store,
1513 commit_id,
1514 &mut std::collections::HashSet::new(),
1515 &mut reachable,
1516 )?;
1517 }
1518
1519 let index = Index::load(&shard_dir.join("index"), &fmt)?;
1521 for manifest in index.files.values() {
1522 let json = metadata::serialize(manifest, &fmt);
1523 let hash = blake3::hash(&json);
1524 let hash_hex = hash.to_hex().to_string();
1525 reachable.insert(hash_hex);
1526 for chunk_hash in &manifest.chunks {
1527 reachable.insert(chunk_hash.clone());
1528 }
1529 }
1530
1531 let all_chunks = store.iter_chunks()?;
1533 let mut pruned = 0u64;
1534 let mut kept = 0u64;
1535 for (hash_hex, full_path) in &all_chunks {
1536 if !reachable.contains(hash_hex) {
1537 store.delete_chunk(hash_hex, Some(full_path))?;
1538 pruned += 1;
1539 } else {
1540 kept += 1;
1541 }
1542 }
1543
1544 if json {
1545 info!(
1546 "{}",
1547 serde_json::to_string(&serde_json::json!({
1548 "pruned": pruned,
1549 "remaining": kept,
1550 }))?
1551 );
1552 } else {
1553 info!("Pruned {} objects. {} objects remain.", pruned, kept);
1554 }
1555 Ok(())
1556}
1557
1558pub fn peer_add(path: &Path, multiaddr: &str, json: bool) -> Result<()> {
1559 let shard_dir = path.join(".shard");
1560 if !shard_dir.exists() {
1561 anyhow::bail!("not a shard repository (run `shard init` first)");
1562 }
1563
1564 if multiaddr.is_empty() || multiaddr.parse::<shard_net::libp2p::Multiaddr>().is_err() {
1566 anyhow::bail!("invalid multiaddr '{}' (must be a valid libp2p multiaddr, e.g. /ip4/1.2.3.4/tcp/5678/p2p/...)", multiaddr);
1567 }
1568
1569 let peers_path = shard_dir.join("peers.json");
1570 let mut peers: Vec<String> = if peers_path.exists() {
1571 let data = fs::read(&peers_path)?;
1572 serde_json::from_slice(&data)?
1573 } else {
1574 Vec::new()
1575 };
1576
1577 let added = if !peers.contains(&multiaddr.to_string()) {
1578 peers.push(multiaddr.to_string());
1579 let data = serde_json::to_vec(&peers)?;
1580 fs::write(peers_path, data)?;
1581 true
1582 } else {
1583 false
1584 };
1585
1586 if json {
1587 info!(
1588 "{}",
1589 serde_json::to_string(&serde_json::json!({
1590 "multiaddr": multiaddr,
1591 "added": added,
1592 }))?
1593 );
1594 } else if added {
1595 info!("Added peer: {}", multiaddr);
1596 } else {
1597 info!("Peer already exists: {}", multiaddr);
1598 }
1599
1600 Ok(())
1601}
1602
1603fn load_peers(shard_dir: &Path) -> Result<Vec<String>> {
1604 let peers_path = shard_dir.join("peers.json");
1605 if peers_path.exists() {
1606 let data = fs::read(peers_path)?;
1607 Ok(serde_json::from_slice(&data)?)
1608 } else {
1609 Ok(Vec::new())
1610 }
1611}
1612
1613fn authorized_keys_path(shard_dir: &Path) -> std::path::PathBuf {
1614 shard_dir.join("authorized_keys")
1615}
1616
1617fn load_authorized_keys(shard_dir: &Path) -> Result<Vec<ed25519_dalek::VerifyingKey>> {
1618 let path = authorized_keys_path(shard_dir);
1619 if !path.exists() {
1620 return Ok(Vec::new());
1621 }
1622 let content = fs::read_to_string(&path)?;
1623 let mut keys = Vec::new();
1624 for line in content.lines() {
1625 let line = line.trim();
1626 if line.is_empty() || line.starts_with('#') {
1627 continue;
1628 }
1629 let bytes = hex::decode(line)?;
1630 let arr: [u8; 32] = bytes
1631 .as_slice()
1632 .try_into()
1633 .map_err(|_| anyhow::anyhow!("Invalid public key length in authorized_keys"))?;
1634 keys.push(ed25519_dalek::VerifyingKey::from_bytes(&arr)?);
1635 }
1636 Ok(keys)
1637}
1638
1639pub fn add_authorized_key(shard_dir: &Path, public_key_hex: &str) -> Result<()> {
1640 let bytes = hex::decode(public_key_hex)?;
1642 let arr: [u8; 32] = bytes
1643 .as_slice()
1644 .try_into()
1645 .map_err(|_| anyhow::anyhow!("Public key must be 32 bytes (64 hex chars)"))?;
1646 let _pk = ed25519_dalek::VerifyingKey::from_bytes(&arr)?;
1647
1648 let path = authorized_keys_path(shard_dir);
1649 let mut content = if path.exists() {
1650 fs::read_to_string(&path)?
1651 } else {
1652 String::new()
1653 };
1654 if content.lines().any(|l| l.trim() == public_key_hex) {
1656 info!("Key already authorized");
1657 return Ok(());
1658 }
1659 content.push_str(public_key_hex);
1660 content.push('\n');
1661 fs::write(&path, content)?;
1662 info!("Authorized key added");
1663 Ok(())
1664}
1665
1666pub fn backup(path: &Path, output: &Path, json: bool) -> Result<()> {
1667 let shard_dir = path.join(".shard");
1668 if !shard_dir.exists() {
1669 anyhow::bail!("not a shard repository (run `shard init` first)");
1670 }
1671 let file = fs::File::create(output)?;
1672 let encoder = flate2::write::GzEncoder::new(file, flate2::Compression::default());
1673 let mut archive = tar::Builder::new(encoder);
1674 archive.append_dir_all(".shard", &shard_dir)?;
1675 archive.finish()?;
1676 if json {
1677 info!(
1678 "{}",
1679 serde_json::to_string(&serde_json::json!({
1680 "path": output.to_string_lossy(),
1681 }))?
1682 );
1683 } else {
1684 info!("Backup created: {}", output.display());
1685 }
1686 Ok(())
1687}
1688
1689pub fn export(path: &Path, commit_id: &str, output_dir: &Path, json: bool) -> Result<()> {
1690 let shard_dir = path.join(".shard");
1691 if !shard_dir.exists() {
1692 anyhow::bail!("not a shard repository (run `shard init` first)");
1693 }
1694 let store = Store::open(&shard_dir)?;
1695 let cipher = maybe_load_cipher(&shard_dir)?;
1696 let commit = load_commit(&store, commit_id)?;
1697 let mut files = Vec::new();
1698 for manifest_id in &commit.manifests {
1699 let data = store.get_chunk(manifest_id)?;
1700 let manifest: FileManifest = metadata::deserialize(&data)?;
1701 let compression = manifest.compression.parse::<Compression>()?;
1702 if !json {
1703 info!("Exporting file: {}", manifest.name);
1704 }
1705 let mut file_data = Vec::new();
1706 for chunk_id in &manifest.chunks {
1707 let chunk_data = store.get_chunk(chunk_id)?;
1708 let decrypted = match &cipher {
1709 Some(c) => c.decrypt(&chunk_data)?,
1710 None => chunk_data,
1711 };
1712 let decompressed = compression.decompress(&decrypted)?;
1713 file_data.extend_from_slice(&decompressed);
1714 }
1715 let out_path = output_dir.join(&manifest.name);
1716 if let Some(parent) = out_path.parent() {
1717 fs::create_dir_all(parent)?;
1718 }
1719 fs::write(&out_path, file_data)?;
1720 if !json {
1721 info!(" -> {}", out_path.display());
1722 }
1723 files.push(manifest.name);
1724 }
1725 if json {
1726 info!(
1727 "{}",
1728 serde_json::to_string(&serde_json::json!({
1729 "commit_id": commit_id,
1730 "files": files,
1731 "output_dir": output_dir.to_string_lossy(),
1732 }))?
1733 );
1734 } else {
1735 info!("Export complete.");
1736 }
1737 Ok(())
1738}
1739
1740pub fn import(
1741 path: &Path,
1742 source_dir: &Path,
1743 message: &str,
1744 author: &str,
1745 json: bool,
1746) -> Result<()> {
1747 let shard_dir = path.join(".shard");
1748 if !shard_dir.exists() {
1749 anyhow::bail!("not a shard repository (run `shard init` first)");
1750 }
1751 let config = load_config(&shard_dir)?;
1753 let compression: Compression = config
1754 .get("compression")
1755 .map(|s| s.as_str())
1756 .unwrap_or("zstd")
1757 .parse()?;
1758 let chunker_mode = chunker::ChunkerMode::from_config(&config);
1759 let fmt = MetadataFormat::from_config(&config);
1760 let store = Store::open(&shard_dir)?;
1761 let cipher = maybe_load_cipher(&shard_dir)?;
1762 let mut index = Index::load(&shard_dir.join("index"), &fmt)?;
1763 if !source_dir.is_dir() {
1764 anyhow::bail!("Source must be a directory");
1765 }
1766 for entry in walkdir::WalkDir::new(source_dir)
1767 .into_iter()
1768 .filter_entry(|e| {
1769 e.file_name()
1770 .to_str()
1771 .map(|s| !s.starts_with('.'))
1772 .unwrap_or(false)
1773 })
1774 {
1775 let entry = entry?;
1776 if entry.file_type().is_file() {
1777 add_file(
1778 path,
1779 entry.path(),
1780 &store,
1781 &mut index,
1782 &compression,
1783 &chunker_mode,
1784 cipher.as_ref(),
1785 json,
1786 )?;
1787 }
1788 }
1789 index.save(&shard_dir.join("index"), &fmt)?;
1790 if !index.files.is_empty() {
1792 commit(path, message, author, json)?;
1793 } else if json {
1794 info!(
1795 "{}",
1796 serde_json::to_string(&serde_json::json!({"status": "no files found"}))?
1797 );
1798 } else {
1799 info!("No files found to import.");
1800 }
1801 Ok(())
1802}
1803
1804pub fn restore(path: &Path, backup_file: &Path, json: bool) -> Result<()> {
1805 let shard_dir = path.join(".shard");
1806 if shard_dir.exists() {
1807 anyhow::bail!(
1808 "Repository already exists — remove .shard first or use a different directory"
1809 );
1810 }
1811 let file = fs::File::open(backup_file)?;
1812 let decoder = flate2::read::GzDecoder::new(file);
1813 let mut archive = tar::Archive::new(decoder);
1814 archive.unpack(path)?;
1815 if !path.join(".shard").exists() {
1817 anyhow::bail!("Backup does not contain a valid .shard directory");
1818 }
1819 if json {
1820 info!(
1821 "{}",
1822 serde_json::to_string(&serde_json::json!({
1823 "backup": backup_file.to_string_lossy(),
1824 }))?
1825 );
1826 } else {
1827 info!("Restored from {}", backup_file.display());
1828 }
1829 Ok(())
1830}
1831
1832struct RepoProvider {
1833 store: Store,
1834 shard_dir: std::path::PathBuf,
1835}
1836
1837impl shard_net::p2p::ShardContentProvider for RepoProvider {
1838 fn get_manifest(&self, id: &str) -> Option<Vec<u8>> {
1839 self.store.get_chunk(id).ok()
1840 }
1841 fn get_chunk(&self, id: &str) -> Option<Vec<u8>> {
1842 self.store.get_chunk(id).ok()
1843 }
1844 fn put_chunk(&mut self, id: &str, data: &[u8]) -> bool {
1845 let hash = blake3::hash(data);
1846 let hex = hash.to_hex().to_string();
1847 if hex != id {
1848 return false;
1849 }
1850 self.store
1851 .put_chunk(&crate::chunker::Chunk {
1852 hash,
1853 data: data.to_vec(),
1854 offset: 0,
1855 })
1856 .is_ok()
1857 }
1858 fn verify_auth(&self, public_key: &[u8], nonce: &[u8], signature: &[u8]) -> bool {
1859 use ed25519_dalek::Verifier;
1860 let pk_bytes: [u8; 32] = match public_key.try_into() {
1861 Ok(b) => b,
1862 Err(_) => return false,
1863 };
1864 let pk = match ed25519_dalek::VerifyingKey::from_bytes(&pk_bytes) {
1865 Ok(k) => k,
1866 Err(_) => return false,
1867 };
1868 let sig_bytes: [u8; 64] = match signature.try_into() {
1869 Ok(b) => b,
1870 Err(_) => return false,
1871 };
1872 let sig = ed25519_dalek::Signature::from_bytes(&sig_bytes);
1873 if pk.verify(nonce, &sig).is_err() {
1874 return false;
1875 }
1876 if let Ok(keys) = load_authorized_keys(&self.shard_dir) {
1878 if !keys.is_empty() {
1879 return keys.contains(&pk);
1880 }
1881 }
1882 true
1883 }
1884 fn repo_public_key(&self) -> Option<Vec<u8>> {
1885 let keys = load_keypair(&self.shard_dir).ok()?;
1886 Some(keys.verifying_key.to_bytes().to_vec())
1887 }
1888}
1889
1890fn commit_stats(shard_dir: &Path, commit_id: &str) -> Result<(u64, u64)> {
1892 let store = Store::open(shard_dir)?;
1893 let commit = load_commit(&store, commit_id)?;
1894 let mut file_count = 0u64;
1895 let mut total_size = 0u64;
1896 for mid in &commit.manifests {
1897 if let Ok(data) = store.get_chunk(mid) {
1898 if let Ok(m) = metadata::deserialize::<FileManifest>(&data) {
1899 file_count += 1;
1900 total_size += m.size;
1901 }
1902 }
1903 }
1904 Ok((file_count, total_size))
1905}
1906
1907pub async fn share(path: &Path, json: bool) -> Result<()> {
1908 let shard_dir = path.join(".shard");
1909 if !shard_dir.exists() {
1910 anyhow::bail!("not a shard repository (run `shard init` first)");
1911 }
1912
1913 let mut node = shard_net::p2p::Node::new().await?;
1914
1915 let ann_topic = shard_net::libp2p::gossipsub::IdentTopic::new("shard:ann");
1917 node.subscribe(&ann_topic)?;
1918
1919 let peers = load_peers(&shard_dir)?;
1921 for peer in peers {
1922 if let Ok(addr) = peer.parse::<shard_net::libp2p::Multiaddr>() {
1923 let _ = node.swarm.dial(addr);
1924 }
1925 }
1926
1927 node.swarm.behaviour_mut().kademlia.bootstrap().ok();
1929
1930 node.listen("/ip4/0.0.0.0/tcp/0").await?;
1931 let listen_addrs: Vec<String> = node.swarm.listeners().map(|a| a.to_string()).collect();
1932 let peer_multiaddr = listen_addrs.first().cloned().unwrap_or_default();
1933
1934 let config = load_config(&shard_dir)?;
1935 let repo_name = config.get("repo_name").cloned().unwrap_or_default();
1936
1937 let store = Store::open(&shard_dir)?;
1938 let provider = RepoProvider {
1939 store,
1940 shard_dir: shard_dir.clone(),
1941 };
1942
1943 if let Some(head) = branch::resolve_head(&shard_dir)?.1 {
1945 let (file_count, total_size) = commit_stats(&shard_dir, &head)?;
1946 let ann = shard_net::protocol::Announcement {
1947 commit_id: head,
1948 file_count,
1949 total_size,
1950 repo_name: repo_name.clone(),
1951 peer_multiaddr: format!("{}/p2p/{}", peer_multiaddr, node.local_peer_id()),
1952 };
1953 let payload = serde_json::to_vec(&ann)?;
1954 let _ = node.publish(&ann_topic, payload);
1955 }
1956
1957 if json {
1958 info!(
1959 "{}",
1960 serde_json::to_string(&serde_json::json!({
1961 "status": "sharing",
1962 "peer_id": node.local_peer_id().to_string(),
1963 }))?
1964 );
1965 } else {
1966 info!("Sharing repository...");
1967 }
1968 node.run(provider).await;
1969
1970 Ok(())
1971}
1972
1973pub async fn relay(listen_addr: &str, json: bool) -> Result<()> {
1976 let mut node = shard_net::p2p::Node::new().await?;
1977 node.listen(listen_addr).await?;
1978 if json {
1979 info!(
1980 "{}",
1981 serde_json::to_string(&serde_json::json!({
1982 "status": "relay active",
1983 "listen": listen_addr,
1984 "peer_id": node.local_peer_id().to_string(),
1985 }))?
1986 );
1987 } else {
1988 info!("Relay server active on {}", listen_addr);
1989 info!("Peer ID: {}", node.local_peer_id());
1990 info!("Ready to accept circuit relay v2 reservations");
1991 }
1992 node.run(EmptyProvider).await;
1993 Ok(())
1994}
1995
1996struct EmptyProvider;
1998impl shard_net::p2p::ShardContentProvider for EmptyProvider {
1999 fn get_manifest(&self, _id: &str) -> Option<Vec<u8>> {
2000 None
2001 }
2002 fn get_chunk(&self, _id: &str) -> Option<Vec<u8>> {
2003 None
2004 }
2005 fn put_chunk(&mut self, _id: &str, _data: &[u8]) -> bool {
2006 false
2007 }
2008 fn verify_auth(&self, _public_key: &[u8], _nonce: &[u8], _signature: &[u8]) -> bool {
2009 false
2010 }
2011 fn repo_public_key(&self) -> Option<Vec<u8>> {
2012 None
2013 }
2014}
2015
2016pub async fn sync(path: &Path, _json: bool) -> Result<()> {
2017 let shard_dir = path.join(".shard");
2018 if !shard_dir.exists() {
2019 anyhow::bail!("not a shard repository (run `shard init` first)");
2020 }
2021
2022 let config = load_config(&shard_dir)?;
2023 let repo_id = config.get("repo_id").cloned().unwrap_or_default();
2024 let repo_name = config.get("repo_name").cloned().unwrap_or_default();
2025 let ann_topic = shard_net::libp2p::gossipsub::IdentTopic::new("shard:ann");
2026 let repo_topic =
2027 shard_net::libp2p::gossipsub::IdentTopic::new(format!("/shard/repo/{}", repo_id));
2028
2029 let mut node = shard_net::p2p::Node::new().await?;
2030 node.subscribe(&ann_topic)?;
2031 node.subscribe(&repo_topic)?;
2032 node.listen("/ip4/0.0.0.0/tcp/0").await?;
2033 let listen_addrs: Vec<String> = node.swarm.listeners().map(|a| a.to_string()).collect();
2034 let peer_multiaddr = listen_addrs.first().cloned().unwrap_or_default();
2035
2036 let peers = load_peers(&shard_dir)?;
2038 for peer in peers {
2039 if let Ok(addr) = peer.parse::<shard_net::libp2p::Multiaddr>() {
2040 let _ = node.swarm.dial(addr);
2041 }
2042 }
2043
2044 node.swarm.behaviour_mut().kademlia.bootstrap().ok();
2046
2047 let publish_ann = |node: &mut shard_net::p2p::Node, ann: &shard_net::protocol::Announcement| {
2049 if let Ok(payload) = serde_json::to_vec(ann) {
2050 let _ = node.publish(&ann_topic, payload.clone());
2051 let _ = node.publish(&repo_topic, payload);
2052 }
2053 };
2054
2055 let head_commit = branch::resolve_head(&shard_dir)?.1;
2056
2057 if let Some(ref head) = head_commit {
2059 let (file_count, total_size) = commit_stats(&shard_dir, head)?;
2060 let ann = shard_net::protocol::Announcement {
2061 commit_id: head.clone(),
2062 file_count,
2063 total_size,
2064 repo_name: repo_name.clone(),
2065 peer_multiaddr: format!("{}/p2p/{}", peer_multiaddr, node.local_peer_id()),
2066 };
2067 publish_ann(&mut node, &ann);
2068 if !_json {
2069 info!("Announced commit {} on sync topic", head)
2070 }
2071 } else if !_json {
2072 info!("No commits to announce");
2073 }
2074
2075 if !_json {
2076 info!("Syncing on topic with peer id: {}", node.local_peer_id());
2077 }
2078 let _ = std::io::stdout().flush();
2079
2080 let store = Store::open(&shard_dir)?;
2081 let mut provider = RepoProvider {
2082 store,
2083 shard_dir: shard_dir.clone(),
2084 };
2085
2086 let mut interval = tokio::time::interval(Duration::from_secs(5));
2087 let mut address_book: HashMap<shard_net::libp2p::PeerId, Vec<shard_net::libp2p::Multiaddr>> =
2088 HashMap::new();
2089 let mut announce_counts: HashMap<(shard_net::libp2p::PeerId, String), u32> = HashMap::new();
2090 let mut request_counts: HashMap<shard_net::libp2p::PeerId, u32> = HashMap::new();
2091 let path_buf = path.to_path_buf();
2092
2093 loop {
2094 tokio::select! {
2095 _ = tokio::signal::ctrl_c() => {
2096 info!("\nSync shutting down...");
2097 break Ok(());
2098 }
2099 _ = interval.tick() => {
2100 request_counts.clear();
2102 announce_counts.clear();
2103 if let Some(ref head) = branch::resolve_head(&shard_dir)?.1 {
2104 let (fc, ts) = commit_stats(&shard_dir, head).unwrap_or_default();
2105 let ann = shard_net::protocol::Announcement {
2106 commit_id: head.clone(),
2107 file_count: fc,
2108 total_size: ts,
2109 repo_name: repo_name.clone(),
2110 peer_multiaddr: format!("{}/p2p/{}", peer_multiaddr, node.local_peer_id()),
2111 };
2112 publish_ann(&mut node, &ann);
2113 info!("Re-announced commit {} on sync topic", head);
2114 }
2115 }
2116 event = node.swarm.select_next_some() => {
2117 match event {
2118 shard_net::libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
2119 info!("Listening on {address:?}");
2120 let _ = std::io::stdout().flush();
2121 }
2122 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2123 shard_net::p2p::ShardBehaviourEvent::Mdns(
2124 shard_net::libp2p::mdns::Event::Discovered(list),
2125 ),
2126 ) => {
2127 for (peer_id, multiaddr) in list {
2128 info!("mDNS discovered: {peer_id} {multiaddr}");
2129 address_book.entry(peer_id).or_default().push(multiaddr.clone());
2130 node.swarm
2131 .behaviour_mut()
2132 .gossipsub
2133 .add_explicit_peer(&peer_id);
2134 node.swarm
2135 .behaviour_mut()
2136 .kademlia
2137 .add_address(&peer_id, multiaddr);
2138 }
2139 }
2140 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2141 shard_net::p2p::ShardBehaviourEvent::Mdns(shard_net::libp2p::mdns::Event::Expired(
2142 list,
2143 )),
2144 ) => {
2145 for (peer_id, _multiaddr) in list {
2146 info!("mDNS expired: {peer_id}");
2147 node.swarm
2148 .behaviour_mut()
2149 .gossipsub
2150 .remove_explicit_peer(&peer_id);
2151 }
2152 }
2153 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2154 shard_net::p2p::ShardBehaviourEvent::Gossipsub(
2155 shard_net::libp2p::gossipsub::Event::Message {
2156 propagation_source,
2157 message,
2158 ..
2159 },
2160 ),
2161 ) => {
2162 if let Ok(ann) = serde_json::from_slice::<shard_net::protocol::Announcement>(&message.data) {
2164 let peer = propagation_source;
2165 let commit_id_owned = ann.commit_id;
2166 info!(
2167 "Peer {} announced commit: {} (repo: {}, {} files, {} bytes)",
2168 peer, commit_id_owned, ann.repo_name, ann.file_count, ann.total_size,
2169 );
2170 if !ann.peer_multiaddr.is_empty() && !address_book.contains_key(&peer) {
2172 if let Ok(addr) = ann.peer_multiaddr.parse::<shard_net::libp2p::Multiaddr>() {
2173 address_book.entry(peer).or_default().push(addr);
2174 }
2175 }
2176 let rate_key = (peer, commit_id_owned.clone());
2178 if announce_counts.get(&rate_key).copied().unwrap_or(0) > 5 {
2179 warn!("Rate limit exceeded for peer {} commit {}", peer, commit_id_owned);
2180 } else {
2181 *announce_counts.entry(rate_key).or_insert(0) += 1;
2182 let our_head = branch::resolve_head(&shard_dir)?.1.unwrap_or_default();
2184 if our_head != commit_id_owned {
2185 let (fc, ts) = commit_stats(&shard_dir, &our_head).unwrap_or_default();
2186 let reply = shard_net::protocol::Announcement {
2187 commit_id: our_head,
2188 file_count: fc,
2189 total_size: ts,
2190 repo_name: repo_name.clone(),
2191 peer_multiaddr: format!("{}/p2p/{}", peer_multiaddr, node.local_peer_id()),
2192 };
2193 publish_ann(&mut node, &reply);
2194 }
2195 if let Some(addrs) = address_book.get(&peer) {
2196 if let Some(addr) = addrs.first() {
2197 let multiaddr_str = format!("{}/p2p/{}", addr, peer);
2198 let path_clone = path_buf.clone();
2199 tokio::spawn(async move {
2200 match pull(&path_clone, &multiaddr_str, &commit_id_owned, false).await {
2201 Ok(_) => info!("Auto-pulled commit {} from {}", commit_id_owned, peer),
2202 Err(e) => error!("Auto-pull failed for commit {} from {}: {}", commit_id_owned, peer, e),
2203 }
2204 });
2205 }
2206 }
2207 }
2208 }
2209 }
2210 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2211 shard_net::p2p::ShardBehaviourEvent::RequestResponse(
2212 shard_net::libp2p::request_response::Event::Message { peer, message },
2213 ),
2214 ) => {
2215 if let shard_net::libp2p::request_response::Message::Request {
2216 request, channel, ..
2217 } = message
2218 {
2219 let req_count = request_counts.entry(peer).or_insert(0u32);
2221 *req_count += 1;
2222 if *req_count > 50 {
2223 warn!("Dropping request from {}: rate limit exceeded", peer);
2224 } else {
2226 info!("Received request from {}", peer);
2227 node.serve_request(&peer, &mut provider, request, channel);
2228 }
2229 } else {
2230 info!("Received Response from {}", peer);
2231 }
2232 }
2233 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2234 shard_net::p2p::ShardBehaviourEvent::RequestResponse(
2235 shard_net::libp2p::request_response::Event::OutboundFailure {
2236 peer, error, ..
2237 },
2238 ),
2239 ) => {
2240 error!("Outbound failure to {}: {:?}", peer, error);
2241 }
2242 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2243 shard_net::p2p::ShardBehaviourEvent::RequestResponse(
2244 shard_net::libp2p::request_response::Event::InboundFailure {
2245 peer, error, ..
2246 },
2247 ),
2248 ) => {
2249 error!("Inbound failure from {}: {:?}", peer, error);
2250 }
2251 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2252 shard_net::p2p::ShardBehaviourEvent::Identify(
2253 shard_net::libp2p::identify::Event::Received { peer_id, info },
2254 ),
2255 ) => {
2256 info!("Identify received from {}: {:?}", peer_id, info.listen_addrs);
2257 for addr in info.listen_addrs {
2258 address_book.entry(peer_id).or_default().push(addr);
2259 }
2260 let _ = std::io::stdout().flush();
2261 }
2262 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2263 shard_net::p2p::ShardBehaviourEvent::Identify(event),
2264 ) => {
2265 info!("Identify event: {:?}", event);
2266 }
2267 shard_net::libp2p::swarm::SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } => {
2268 info!("Connection established with {}", peer_id);
2269 if let shard_net::libp2p::core::ConnectedPoint::Dialer { address, .. } = &endpoint {
2272 address_book.entry(peer_id).or_default().push(address.clone());
2273 }
2274 node.swarm
2275 .behaviour_mut()
2276 .gossipsub
2277 .add_explicit_peer(&peer_id);
2278 if let Some(ref head) = branch::resolve_head(&shard_dir)?.1 {
2280 let (fc, ts) = commit_stats(&shard_dir, head).unwrap_or_default();
2281 let ann = shard_net::protocol::Announcement {
2282 commit_id: head.clone(),
2283 file_count: fc,
2284 total_size: ts,
2285 repo_name: repo_name.clone(),
2286 peer_multiaddr: format!("{}/p2p/{}", peer_multiaddr, node.local_peer_id()),
2287 };
2288 publish_ann(&mut node, &ann);
2289 }
2290 }
2291 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2292 shard_net::p2p::ShardBehaviourEvent::Relay(event),
2293 ) => {
2294 info!("Relay event: {:?}", event);
2295 }
2296 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2297 shard_net::p2p::ShardBehaviourEvent::Dcutr(event),
2298 ) => {
2299 info!("DCUtR event: {:?}", event);
2300 }
2301 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2302 shard_net::p2p::ShardBehaviourEvent::Autonat(event),
2303 ) => {
2304 info!("AutoNAT event: {:?}", event);
2305 }
2306 shard_net::libp2p::swarm::SwarmEvent::IncomingConnection {
2307 local_addr,
2308 send_back_addr,
2309 ..
2310 } => {
2311 info!(
2312 "Incoming connection from {} to {}",
2313 send_back_addr, local_addr
2314 );
2315 }
2316 e => {
2317 info!("Event: {:?}", e);
2318 }
2319 }
2320 }
2321 }
2322 }
2323}
2324
2325pub async fn pull(path: &Path, peer: &str, commit_id: &str, json: bool) -> Result<()> {
2326 let shard_dir = path.join(".shard");
2327 if !shard_dir.exists() {
2330 init(path, "flat", "zstd", "fixed", None, false, false)?;
2331 }
2332
2333 let store = Store::open(&shard_dir)?;
2334 let cipher = maybe_load_cipher(&shard_dir)?;
2335
2336 let mut node = shard_net::p2p::Node::new().await?;
2337
2338 let multiaddr: shard_net::libp2p::Multiaddr = peer.parse()?;
2340 let peer_id = match multiaddr.iter().last() {
2341 Some(shard_net::libp2p::multiaddr::Protocol::P2p(peer_id)) => peer_id,
2342 _ => anyhow::bail!("Multiaddr must end with /p2p/<peer_id>"),
2343 };
2344
2345 if !json {
2347 info!("Pulling commit {} from {}...", commit_id, peer);
2348 }
2349 let commit_data = node
2350 .request_manifest(&multiaddr, peer_id, commit_id.to_string())
2351 .await?;
2352 let hash = blake3::hash(&commit_data);
2353 if hash.to_hex().to_string() != commit_id {
2354 anyhow::bail!("Commit hash mismatch");
2355 }
2356 let chunk = crate::chunker::Chunk {
2357 hash,
2358 data: commit_data.clone(),
2359 offset: 0,
2360 };
2361 store.put_chunk(&chunk)?;
2362
2363 let commit: Commit = metadata::deserialize(&commit_data)?;
2364 if !json {
2365 info!("Got commit: {}", commit.message);
2366 }
2367
2368 let keys_dir = shard_dir.join("keys");
2370 if let Some(kid) = &commit.key_id {
2371 if keys_dir.join("records").exists() {
2372 if let Ok(chain) = keychain::collect_rotation_chain(&keys_dir, kid) {
2373 let missing_rotations: Vec<&KeyRotation> = chain
2374 .iter()
2375 .filter(|r| store.get_chunk(&r.rotation_id).is_err())
2376 .collect();
2377 if !missing_rotations.is_empty() {
2378 if !json {
2379 info!(
2380 "Fetching {} key rotation records from peer...",
2381 missing_rotations.len()
2382 );
2383 }
2384 let rot_requests: Vec<(String, shard_net::protocol::ShardRequest)> =
2385 missing_rotations
2386 .iter()
2387 .map(|r| {
2388 (
2389 r.rotation_id.clone(),
2390 shard_net::protocol::ShardRequest::GetChunk(
2391 r.rotation_id.clone(),
2392 ),
2393 )
2394 })
2395 .collect();
2396 if let Ok(rot_results) = node
2397 .request_parallel(&multiaddr, peer_id, rot_requests)
2398 .await
2399 {
2400 for (rot_id, rot_data) in &rot_results {
2401 let rh = blake3::hash(rot_data);
2402 if rh.to_hex().to_string() != *rot_id {
2403 info!("Key rotation record hash mismatch (expected {}, got {}) — skipping", rot_id, rh.to_hex());
2404 continue;
2405 }
2406 store.put_chunk(&crate::chunker::Chunk {
2407 hash: rh,
2408 data: rot_data.clone(),
2409 offset: 0,
2410 })?;
2411 }
2412 if !json {
2413 info!("Key rotation records synced from peer.");
2414 }
2415 }
2416 }
2417 }
2418 }
2419 }
2420
2421 if let Some(pk_hex) = &commit.public_key {
2423 let pk_bytes = hex::decode(pk_hex)?;
2424 let repo_id = blake3::hash(&pk_bytes).to_hex().to_string();
2425 let mut config = load_config(&shard_dir)?;
2426 config.insert("repo_id".to_string(), repo_id);
2427 save_config(&shard_dir, &config)?;
2428 }
2429
2430 let manifest_requests: Vec<(String, shard_net::protocol::ShardRequest)> = commit
2432 .manifests
2433 .iter()
2434 .map(|id| {
2435 (
2436 id.clone(),
2437 shard_net::protocol::ShardRequest::GetManifest(id.clone()),
2438 )
2439 })
2440 .collect();
2441 let manifest_results = node
2442 .request_parallel(&multiaddr, peer_id, manifest_requests)
2443 .await?;
2444
2445 let mut all_chunk_ids: Vec<String> = Vec::new();
2446 let mut file_manifests: Vec<FileManifest> = Vec::new();
2447 let mut chunk_compression: HashMap<String, String> = HashMap::new();
2449
2450 for (manifest_id, manifest_data) in &manifest_results {
2451 let hash = blake3::hash(manifest_data);
2452 if hash.to_hex().to_string() != *manifest_id {
2453 anyhow::bail!("Manifest hash mismatch: {}", manifest_id);
2454 }
2455 let chunk = crate::chunker::Chunk {
2456 hash,
2457 data: manifest_data.clone(),
2458 offset: 0,
2459 };
2460 store.put_chunk(&chunk)?;
2461 let manifest: FileManifest = metadata::deserialize(manifest_data)?;
2462 if !json {
2463 info!(
2464 "Fetching file: {} (compression: {})",
2465 manifest.name, manifest.compression
2466 );
2467 }
2468 for cid in &manifest.chunks {
2469 chunk_compression.insert(cid.clone(), manifest.compression.clone());
2470 }
2471 all_chunk_ids.extend(manifest.chunks.clone());
2472 file_manifests.push(manifest);
2473 }
2474
2475 let partial = partial::PartialTransfer::new(&shard_dir, commit_id)?;
2477 let partial_chunks = partial.list_chunks()?;
2478 let mut recovered = 0usize;
2479 for chunk_id in &partial_chunks {
2480 if let Ok(data) = partial.load_chunk(chunk_id) {
2481 let hash = blake3::hash(&data);
2482 if hash.to_hex().to_string() == *chunk_id {
2483 let chunk = crate::chunker::Chunk {
2485 hash,
2486 data: data.clone(),
2487 offset: 0,
2488 };
2489 if store.put_chunk(&chunk).is_ok() {
2490 recovered += 1;
2491 }
2492 } else {
2493 let _ = partial.remove_chunk(chunk_id);
2495 }
2496 }
2497 }
2498 if recovered > 0 {
2499 info!("Recovered {} chunks from partial transfer", recovered);
2500 }
2501
2502 let needed_chunks: Vec<String> = all_chunk_ids
2504 .into_iter()
2505 .filter(|id| store.get_chunk(id).is_err())
2506 .collect();
2507
2508 if !needed_chunks.is_empty() {
2509 if !json {
2510 info!("Fetching {} chunks...", needed_chunks.len());
2511 }
2512 let chunk_requests: Vec<(String, shard_net::protocol::ShardRequest)> = needed_chunks
2513 .iter()
2514 .map(|id| {
2515 (
2516 id.clone(),
2517 shard_net::protocol::ShardRequest::GetChunk(id.clone()),
2518 )
2519 })
2520 .collect();
2521 let chunk_results = node
2522 .request_parallel(&multiaddr, peer_id, chunk_requests)
2523 .await?;
2524 for (chunk_id, chunk_data) in &chunk_results {
2525 let compression: Compression = chunk_compression
2527 .get(chunk_id)
2528 .map(|s| s.as_str())
2529 .unwrap_or("none")
2530 .parse()?;
2531 let decrypted = match &cipher {
2533 Some(c) => c.decrypt(chunk_data)?,
2534 None => chunk_data.clone(),
2535 };
2536 let decompressed = compression.decompress(&decrypted)?;
2537 let hash = blake3::hash(&decompressed);
2538 if hash.to_hex().to_string() != *chunk_id {
2539 anyhow::bail!("Chunk hash mismatch: {}", chunk_id);
2540 }
2541 let chunk = crate::chunker::Chunk {
2543 hash,
2544 data: chunk_data.clone(),
2545 offset: 0,
2546 };
2547 store.put_chunk(&chunk)?;
2548 partial.save_chunk(chunk_id, chunk_data)?;
2550 }
2551 }
2552
2553 for manifest in &file_manifests {
2555 let compression = manifest.compression.parse::<Compression>()?;
2556 let mut file_data = Vec::new();
2557 for chunk_id in &manifest.chunks {
2558 let stored = store.get_chunk(chunk_id)?;
2559 let decrypted = match &cipher {
2560 Some(c) => c.decrypt(&stored)?,
2561 None => stored,
2562 };
2563 let decompressed = compression.decompress(&decrypted)?;
2564 file_data.extend_from_slice(&decompressed);
2565 }
2566 fs::write(path.join(&manifest.name), file_data)?;
2567 if !json {
2568 info!(
2569 "Reconstructed file: {} ({} bytes)",
2570 manifest.name, manifest.size
2571 );
2572 }
2573 }
2574
2575 partial.cleanup()?;
2577
2578 if json {
2579 info!(
2580 "{}",
2581 serde_json::to_string(&serde_json::json!({
2582 "status": "pull complete",
2583 "commit_id": commit_id,
2584 }))?
2585 );
2586 } else {
2587 info!("Pull complete.");
2588 }
2589 Ok(())
2590}
2591
2592pub fn transfer_list(path: &Path, json: bool) -> Result<()> {
2593 let shard_dir = path.join(".shard");
2594 if !shard_dir.exists() {
2595 anyhow::bail!("not a shard repository (run `shard init` first)");
2596 }
2597 let transfers = partial::list_incomplete_transfers(&shard_dir)?;
2598 if json {
2599 info!("{}", serde_json::to_string(&transfers)?);
2600 } else {
2601 if transfers.is_empty() {
2602 info!("No incomplete transfers.");
2603 } else {
2604 for t in &transfers {
2605 info!("Incomplete transfer: {}", t);
2606 }
2607 }
2608 }
2609 Ok(())
2610}
2611
2612pub fn transfer_remove(path: &Path, commit_id: &str) -> Result<()> {
2613 let shard_dir = path.join(".shard");
2614 if !shard_dir.exists() {
2615 anyhow::bail!("not a shard repository (run `shard init` first)");
2616 }
2617 partial::remove_transfer(&shard_dir, commit_id)?;
2618 info!("Removed transfer tracking for {}", commit_id);
2619 Ok(())
2620}
2621
2622pub async fn push(path: &Path, peer: &str, json: bool) -> Result<()> {
2623 let shard_dir = path.join(".shard");
2624 if !shard_dir.exists() {
2625 anyhow::bail!("not a shard repository (run `shard init` first)");
2626 }
2627
2628 let (_, head_id) = branch::resolve_head(&shard_dir)?;
2629 let head_id = head_id.ok_or_else(|| anyhow::anyhow!("No commits to push"))?;
2630
2631 let store = Store::open(&shard_dir)?;
2632
2633 let mut objects: std::collections::BTreeMap<String, Vec<u8>> =
2635 std::collections::BTreeMap::new();
2636
2637 let mut seen = std::collections::HashSet::new();
2639 let mut stack = vec![head_id.clone()];
2640 while let Some(cid) = stack.pop() {
2641 if !seen.insert(cid.clone()) {
2642 continue;
2643 }
2644 if let Ok(data) = store.get_chunk(&cid) {
2645 objects.insert(cid, data.clone());
2646 if let Ok(commit) = metadata::deserialize::<Commit>(&data) {
2647 if let Some(kid) = &commit.key_id {
2649 let keys_dir = shard_dir.join("keys");
2650 if let Ok(chain) = keychain::collect_rotation_chain(&keys_dir, kid) {
2651 for rot in &chain {
2652 let rj = serde_json::to_vec(rot)?;
2653 let rh = blake3::hash(&rj);
2654 if !store.has_chunk(rh.to_hex().as_ref()) {
2655 store.put_chunk(&crate::chunker::Chunk {
2656 hash: rh,
2657 data: rj.clone(),
2658 offset: 0,
2659 })?;
2660 }
2661 objects.insert(rot.rotation_id.clone(), rj);
2662 }
2663 }
2664 }
2665 for mid in &commit.manifests {
2666 if let Ok(manifest_data) = store.get_chunk(mid) {
2667 objects.insert(mid.clone(), manifest_data.clone());
2668 if let Ok(manifest) = metadata::deserialize::<FileManifest>(&manifest_data)
2669 {
2670 for cid in &manifest.chunks {
2671 if let Ok(chunk_data) = store.get_chunk(cid) {
2672 objects.insert(cid.clone(), chunk_data);
2673 }
2674 }
2675 }
2676 }
2677 }
2678 for parent in &commit.parents {
2679 stack.push(parent.clone());
2680 }
2681 }
2682 }
2683 }
2684
2685 if !json {
2686 info!(
2687 "Pushing {} objects ({} bytes)...",
2688 objects.len(),
2689 objects.values().map(|v| v.len() as u64).sum::<u64>()
2690 );
2691 }
2692
2693 let mut node = shard_net::p2p::Node::new().await?;
2695 let multiaddr: shard_net::libp2p::Multiaddr = peer.parse()?;
2696 let peer_id = match multiaddr.iter().last() {
2697 Some(shard_net::libp2p::multiaddr::Protocol::P2p(peer_id)) => peer_id,
2698 _ => anyhow::bail!("Multiaddr must end with /p2p/<peer_id>"),
2699 };
2700
2701 for (id, data) in &objects {
2702 node.request_put_chunk(&multiaddr, peer_id, id.clone(), data.clone())
2703 .await?;
2704 }
2705
2706 if json {
2707 info!(
2708 "{}",
2709 serde_json::to_string(&serde_json::json!({
2710 "status": "push complete",
2711 "objects": objects.len(),
2712 }))?
2713 );
2714 } else {
2715 info!("Push complete ({} objects).", objects.len());
2716 }
2717 Ok(())
2718}
2719
2720pub fn key_rotate(path: &Path) -> Result<()> {
2723 let shard_dir = path.join(".shard");
2724 if !shard_dir.exists() {
2725 anyhow::bail!("not a shard repository (run `shard init` first)");
2726 }
2727 let keys_dir = shard_dir.join("keys");
2728 let rotation = keychain::rotate_signing_key(&keys_dir)?;
2729
2730 if let Some(global) = global_keys_dir() {
2732 fs::create_dir_all(&global).ok();
2733 if let Ok(new_keys) = KeyPair::load(&keys_dir) {
2734 let _ = new_keys.save(&global);
2735 }
2736 }
2737
2738 let store = Store::open(&shard_dir)?;
2740 let json = serde_json::to_vec(&rotation)?;
2741 let hash = blake3::hash(&json);
2742 if !store.has_chunk(hash.to_hex().as_ref()) {
2743 store.put_chunk(&crate::chunker::Chunk {
2744 hash,
2745 data: json,
2746 offset: 0,
2747 })?;
2748 }
2749
2750 let rotations = keychain::load_rotations(&keys_dir)?;
2752 for rot in &rotations {
2753 let rj = serde_json::to_vec(rot)?;
2754 let rh = blake3::hash(&rj);
2755 if !store.has_chunk(rh.to_hex().as_ref()) {
2756 store.put_chunk(&crate::chunker::Chunk {
2757 hash: rh,
2758 data: rj,
2759 offset: 0,
2760 })?;
2761 }
2762 }
2763
2764 info!(
2765 "Key rotated: {} -> {}",
2766 rotation.old_key_id, rotation.new_key_id
2767 );
2768 info!("Rotation record: {} (stored in DAG)", rotation.rotation_id);
2769 Ok(())
2770}
2771
2772pub fn key_list(path: &Path, json: bool) -> Result<()> {
2774 let shard_dir = path.join(".shard");
2775 if !shard_dir.exists() {
2776 anyhow::bail!("not a shard repository (run `shard init` first)");
2777 }
2778 let keys_dir = shard_dir.join("keys");
2779 let records = keychain::load_records(&keys_dir)?;
2780 let current_id = keychain::get_current_key_id(&keys_dir)?;
2781
2782 if json {
2783 info!(
2784 "{}",
2785 serde_json::to_string_pretty(&serde_json::json!({
2786 "current": current_id,
2787 "records": &records,
2788 }))?
2789 );
2790 } else {
2791 info!("Current key: {}", current_id);
2792 info!("Key history:");
2793 for record in &records {
2794 let marker = if record.key_id == current_id {
2795 " (active)"
2796 } else {
2797 ""
2798 };
2799 info!(
2800 " {} created_at={}{}",
2801 record.key_id, record.created_at, marker
2802 );
2803 if let Some(prev) = &record.previous_key_id {
2804 info!(" previous: {}", prev);
2805 }
2806 }
2807 }
2808 Ok(())
2809}
2810
2811pub fn key_verify(path: &Path, json: bool) -> Result<()> {
2813 let shard_dir = path.join(".shard");
2814 if !shard_dir.exists() {
2815 anyhow::bail!("not a shard repository (run `shard init` first)");
2816 }
2817 let keys_dir = shard_dir.join("keys");
2818 let errors = keychain::verify_keychain(&keys_dir)?;
2819
2820 if json {
2821 info!(
2822 "{}",
2823 serde_json::to_string(&serde_json::json!({
2824 "verified": errors.is_empty(),
2825 "errors": errors,
2826 }))?
2827 );
2828 } else {
2829 if errors.is_empty() {
2830 info!("Keychain verification successful.");
2831 } else {
2832 for err in &errors {
2833 error!("Keychain error: {}", err);
2834 }
2835 anyhow::bail!("Keychain verification failed ({} errors).", errors.len());
2836 }
2837 }
2838 Ok(())
2839}