1pub mod branch;
2pub mod chunker;
3pub mod commit;
4pub mod compression;
5pub mod encryption;
6pub mod index;
7pub mod keychain;
8pub mod manifest;
9pub mod metadata;
10pub mod partial;
11pub mod store;
12pub mod wal;
13
14use crate::commit::Commit;
15use crate::compression::Compression;
16use crate::index::Index;
17use crate::keychain::KeyRotation;
18use crate::manifest::FileManifest;
19use crate::store::Store;
20use anyhow::Result;
21use ed25519_dalek::{Signer, Verifier};
22use metadata::MetadataFormat;
23use serde::Serialize;
24use shard_crypto::KeyPair;
25use shard_net::libp2p::futures::StreamExt;
26use similar::TextDiff;
27use std::collections::HashMap;
28use std::fs;
29use std::io::Write;
30use std::path::Path;
31use std::path::PathBuf;
32use std::time::{Duration, SystemTime, UNIX_EPOCH};
33use tracing::{error, info, warn};
34
35pub fn init(
36 path: &Path,
37 backend: &str,
38 compression_algo: &str,
39 chunker_mode: &str,
40 chunk_size: Option<u64>,
41 is_private: bool,
42 json: bool,
43) -> Result<()> {
44 let shard_dir = path.join(".shard");
45 if shard_dir.exists() {
46 anyhow::bail!(
47 "repository already initialized at {} (run `shard status` to confirm)",
48 shard_dir.display()
49 );
50 }
51 fs::create_dir_all(shard_dir.join("objects"))?;
52 fs::create_dir_all(shard_dir.join("keys"))?;
53 fs::create_dir_all(shard_dir.join("refs").join("heads"))?;
54 branch::set_head_branch(&shard_dir, "main")?;
55
56 let keys = KeyPair::generate();
57 keys.save(&shard_dir.join("keys"))?;
58 if let Some(global) = global_keys_dir() {
59 fs::create_dir_all(&global).ok();
60 let _ = keys.save(&global);
61 let _ = keychain::init_keychain(&global);
62 }
63 keychain::init_keychain(&shard_dir.join("keys"))?;
64
65 let pubkey = fs::read(shard_dir.join("keys/public.key"))?;
68 let repo_id = blake3::hash(&pubkey).to_hex().to_string();
69 let mut config = load_config(&shard_dir)?;
70
71 if is_private {
72 let key = encryption::generate_repo_key();
73 encryption::save_repo_key(&shard_dir.join("keys"), &key)?;
74 config.insert("private".to_string(), "true".to_string());
75 }
76 config.insert("repo_id".to_string(), repo_id);
77 config.insert("storage_backend".to_string(), backend.to_string());
78 config.insert(
79 "serialization_format".to_string(),
80 MetadataFormat::Json.config_value().to_string(),
81 );
82 config.insert("compression".to_string(), compression_algo.to_string());
83 config.insert("chunker_mode".to_string(), chunker_mode.to_string());
84 match chunker_mode {
85 "rabin" => {
86 let chunk_size = chunk_size.unwrap_or(4_194_304);
87 let min = chunk_size / 4;
88 let max = chunk_size * 2;
89 config.insert("chunk_min".to_string(), min.to_string());
90 config.insert("chunk_avg".to_string(), chunk_size.to_string());
91 config.insert("chunk_max".to_string(), max.to_string());
92 }
93 _ => {
94 let cs = chunk_size.unwrap_or(4_194_304);
95 config.insert("chunk_size".to_string(), cs.to_string());
96 }
97 }
98 save_config(&shard_dir, &config)?;
99
100 let chunker_desc = if chunker_mode == "rabin" {
101 format!(
102 "rabin (avg {} bytes)",
103 config.get("chunk_avg").unwrap_or(&"4 MiB".to_string())
104 )
105 } else {
106 format!(
107 "fixed ({} bytes)",
108 config.get("chunk_size").unwrap_or(&"4 MiB".to_string())
109 )
110 };
111 if json {
112 info!(
113 "{}",
114 serde_json::to_string(&serde_json::json!({
115 "path": shard_dir.display().to_string(),
116 "backend": backend,
117 "compression": compression_algo,
118 "chunker": chunker_desc,
119 "private": is_private,
120 }))?
121 );
122 } else {
123 info!(
124 "Initialized empty Shard repository in {} with {} storage (compression: {}, chunking: {})",
125 shard_dir.display(),
126 backend,
127 compression_algo,
128 chunker_desc,
129 );
130 }
131 Ok(())
132}
133
134fn relative_path(repo_root: &Path, file_path: &Path) -> String {
135 let repo = repo_root
136 .canonicalize()
137 .unwrap_or_else(|_| repo_root.to_path_buf());
138 let file = file_path
139 .canonicalize()
140 .unwrap_or_else(|_| file_path.to_path_buf());
141 file.strip_prefix(&repo)
142 .map(|p| p.to_string_lossy().to_string())
143 .unwrap_or_else(|_| {
144 file_path
145 .file_name()
146 .map(|s| s.to_string_lossy().to_string())
147 .unwrap_or_default()
148 })
149}
150
151fn detect_content_type(file_path: &Path) -> Option<String> {
152 let ext = file_path.extension()?.to_str()?.to_lowercase();
153 let mime = match ext.as_str() {
154 "txt" => "text/plain",
155 "json" => "application/json",
156 "csv" => "text/csv",
157 "png" => "image/png",
158 "jpg" | "jpeg" => "image/jpeg",
159 "gif" => "image/gif",
160 "pdf" => "application/pdf",
161 "yaml" | "yml" => "application/x-yaml",
162 "md" => "text/markdown",
163 "html" | "htm" => "text/html",
164 "py" => "text/x-python",
165 "rs" => "text/x-rust",
166 "ts" => "text/x-typescript",
167 "js" => "application/javascript",
168 "wasm" => "application/wasm",
169 "toml" => "application/toml",
170 "xml" => "application/xml",
171 "zip" => "application/zip",
172 "tar" => "application/x-tar",
173 "gz" => "application/gzip",
174 "bin" => "application/octet-stream",
175 "pt" | "pth" | "ckpt" | "safetensors" => "application/x-model",
176 _ => return None,
177 };
178 Some(mime.to_string())
179}
180
181#[allow(clippy::too_many_arguments)]
182fn add_file(
183 repo_root: &Path,
184 file_path: &Path,
185 store: &Store,
186 index: &mut Index,
187 compression: &Compression,
188 chunker_mode: &chunker::ChunkerMode,
189 cipher: Option<&encryption::RepoCipher>,
190 _json: bool,
191) -> Result<()> {
192 let file = fs::File::open(file_path)?;
193 let mut chunker = match chunker_mode {
194 chunker::ChunkerMode::Fixed { chunk_size } => {
195 chunker::Chunker::new_fixed(Box::new(file), *chunk_size)
196 }
197 chunker::ChunkerMode::Rabin { min, avg, max } => {
198 chunker::Chunker::new_rabin(Box::new(file), *min, *avg, *max)
199 }
200 };
201 let mut chunk_hashes = Vec::new();
202 let mut total_size = 0;
203
204 while let Some(chunk) = chunker.next_chunk()? {
205 let hash = chunk.hash;
206 let compressed_data = compression.compress(&chunk.data)?;
207 let stored_data = match cipher {
208 Some(c) => c.encrypt(&compressed_data),
209 None => compressed_data,
210 };
211 let stored = crate::chunker::Chunk {
212 hash,
213 data: stored_data,
214 offset: chunk.offset,
215 };
216 store.put_chunk(&stored)?;
217 chunk_hashes.push(hash.to_hex().to_string());
218 total_size += chunk.data.len() as u64;
219 }
220
221 let name = relative_path(repo_root, file_path);
222 let manifest = FileManifest {
223 name: name.clone(),
224 size: total_size,
225 chunks: chunk_hashes.clone(),
226 content_type: detect_content_type(file_path),
227 compression: compression.as_str().to_string(),
228 merkle_root: Some(FileManifest::merkle_root(&chunk_hashes)),
229 created_by: None,
230 created_at: None,
231 signature: None,
232 };
233
234 index.files.insert(name.clone(), manifest);
235 if !_json {
236 info!("Added {} ({})", name, total_size);
237 }
238 Ok(())
239}
240
241pub fn recover(path: &Path, json: bool) -> Result<()> {
242 let shard_dir = path.join(".shard");
243 if !shard_dir.exists() {
244 anyhow::bail!("not a shard repository (run `shard init` first)");
245 }
246 wal::recover(&shard_dir)?;
247 if json {
248 info!(
249 "{}",
250 serde_json::to_string(&serde_json::json!({"status": "recovery complete"}))?
251 );
252 } else {
253 info!("Recovery complete.");
254 }
255 Ok(())
256}
257
258pub fn add(path: &Path, file_path: &Path, json: bool) -> Result<()> {
259 let shard_dir = path.join(".shard");
260 if !shard_dir.exists() {
261 anyhow::bail!("not a shard repository (run `shard init` first)");
262 }
263
264 wal::recover(&shard_dir)?;
265
266 let config = load_config(&shard_dir)?;
267 let compression: Compression = config
268 .get("compression")
269 .map(|s| s.as_str())
270 .unwrap_or("zstd")
271 .parse()?;
272
273 let chunker_mode = chunker::ChunkerMode::from_config(&config);
274 let fmt = MetadataFormat::from_config(&config);
275
276 let store = Store::open(&shard_dir)?;
277 let mut index = Index::load(&shard_dir.join("index"), &fmt)?;
278
279 let cipher = maybe_load_cipher(&shard_dir)?;
280
281 if file_path.is_dir() {
282 for entry in walkdir::WalkDir::new(file_path)
283 .into_iter()
284 .filter_entry(|e| {
285 e.file_name()
286 .to_str()
287 .map(|s| !s.starts_with('.'))
288 .unwrap_or(false)
289 })
290 {
291 let entry = entry?;
292 if entry.file_type().is_file() {
293 add_file(
294 path,
295 entry.path(),
296 &store,
297 &mut index,
298 &compression,
299 &chunker_mode,
300 cipher.as_ref(),
301 json,
302 )?;
303 }
304 }
305 } else {
306 add_file(
307 path,
308 file_path,
309 &store,
310 &mut index,
311 &compression,
312 &chunker_mode,
313 cipher.as_ref(),
314 json,
315 )?;
316 }
317
318 index.save(&shard_dir.join("index"), &fmt)?;
319 if json {
320 info!(
321 "{}",
322 serde_json::to_string(&serde_json::json!({"status": "added"}))?
323 );
324 }
325 Ok(())
326}
327
328pub fn commit(path: &Path, message: &str, author: &str, json: bool) -> Result<()> {
329 let shard_dir = path.join(".shard");
330 if !shard_dir.exists() {
331 anyhow::bail!("not a shard repository (run `shard init` first)");
332 }
333
334 wal::recover(&shard_dir)?;
336
337 let config = load_config(&shard_dir)?;
338 let fmt = MetadataFormat::from_config(&config);
339
340 let store = Store::open(&shard_dir)?;
341 let mut index = Index::load(&shard_dir.join("index"), &fmt)?;
342
343 if index.files.is_empty() {
344 anyhow::bail!("nothing to commit (stage files with `shard add` first)");
345 }
346
347 let head_path = shard_dir.join("HEAD");
348
349 let wal = wal::Wal::new(&shard_dir);
351 let head_backup = fs::read_to_string(&head_path).ok();
352 let index_backup = fs::read(shard_dir.join("index"))?;
353 wal.append(&wal::WalEntry::CommitBegin {
354 head_backup,
355 index_backup,
356 })?;
357
358 let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
360 let keys = load_keypair(&shard_dir)?;
361 let signing_key = keys.signing_key;
362 let mut manifest_ids = Vec::new();
363 for manifest in index.files.values_mut() {
364 manifest.created_by = Some(author.to_string());
365 manifest.created_at = Some(timestamp);
366
367 let mut unsigned = manifest.clone();
368 unsigned.signature = None;
369 let canonical = metadata::serialize_for_signing(&unsigned);
370 let sig = signing_key.sign(&canonical);
371 manifest.signature = Some(hex::encode(sig.to_bytes()));
372
373 let encoded = metadata::serialize(manifest, &fmt);
374 let hash = blake3::hash(&encoded);
375 let chunk = crate::chunker::Chunk {
376 hash,
377 data: encoded,
378 offset: 0,
379 };
380 store.put_chunk(&chunk)?;
381 manifest_ids.push(hash.to_hex().to_string());
382 }
383 manifest_ids.sort();
384
385 let mut parents = Vec::new();
387 let (current_branch, parent_id) = branch::resolve_head(&shard_dir)?;
388 if let Some(pid) = parent_id {
389 parents.push(pid);
390 }
391
392 let public_key_hex = hex::encode(keys.verifying_key.to_bytes());
394 let key_id = keychain::get_current_key_id(&shard_dir.join("keys")).ok();
395 let mut commit = Commit {
396 commit_id: String::new(),
397 parents,
398 manifests: manifest_ids,
399 author: author.to_string(),
400 message: message.to_string(),
401 timestamp,
402 public_key: Some(public_key_hex),
403 signature: None,
404 key_id,
405 };
406
407 let json_unsigned = metadata::serialize_for_signing(&commit);
409 let signature = signing_key.sign(&json_unsigned);
410 commit.signature = Some(hex::encode(signature.to_bytes()));
411
412 let encoded = metadata::serialize(&commit, &fmt);
414 let hash = blake3::hash(&encoded);
415 let chunk = crate::chunker::Chunk {
416 hash,
417 data: encoded,
418 offset: 0,
419 };
420 store.put_chunk(&chunk)?;
421
422 let commit_id = hash.to_hex().to_string();
424 if has_dag_cycle(&store, &commit.parents, &commit_id)? {
425 anyhow::bail!(
426 "Cycle detected: commit {} is already an ancestor of one or more parents",
427 commit_id
428 );
429 }
430
431 if let Some(ref branch_name) = current_branch {
433 branch::update_branch_ref(&shard_dir, branch_name, &commit_id)?;
434 branch::set_head_branch(&shard_dir, branch_name)?;
435 } else {
436 fs::write(&head_path, &commit_id)?;
437 }
438
439 index.files.clear();
441 index.save(&shard_dir.join("index"), &fmt)?;
442
443 wal.append(&wal::WalEntry::CommitEnd)?;
445 wal.truncate()?;
446
447 if json {
448 info!(
449 "{}",
450 serde_json::to_string(&serde_json::json!({
451 "commit_id": commit_id,
452 "message": message,
453 }))?
454 );
455 } else {
456 info!("Committed {} ({})", commit_id, message);
457 }
458 Ok(())
459}
460
461pub fn verify(path: &Path, commit_id: &str, json: bool) -> Result<()> {
462 let shard_dir = path.join(".shard");
463 if !shard_dir.exists() {
464 anyhow::bail!("not a shard repository (run `shard init` first)");
465 }
466
467 if commit_id.len() < 2 {
468 anyhow::bail!("invalid commit id (too short: need at least 2 characters)");
469 }
470 let store = Store::open(&shard_dir)?;
471 let cipher = maybe_load_cipher(&shard_dir)?;
472 let commit_data = store.get_chunk(commit_id)?;
473
474 let stored_hash = blake3::hash(&commit_data);
476 if stored_hash.to_hex().to_string() != commit_id {
477 anyhow::bail!("commit object hash mismatch: stored content does not match its hash — data may be corrupted");
478 }
479
480 let commit: Commit = metadata::deserialize(&commit_data)?;
481
482 let mut sig_verified = false;
483 let mut files_checked = 0u64;
484
485 if let Some(kid) = &commit.key_id {
487 if let Err(e) = keychain::key_was_valid_at(&shard_dir.join("keys"), kid, commit.timestamp) {
488 anyhow::bail!("Keychain verification failed: {}", e);
489 } else if !json {
490 info!("Keychain: key {} was active at commit time.", kid);
491 }
492 }
493
494 if let Some(sig_hex) = &commit.signature {
495 let verifying_key = if let Some(pk_hex) = &commit.public_key {
496 let pk_bytes = hex::decode(pk_hex)?;
497 ed25519_dalek::VerifyingKey::from_bytes(pk_bytes.as_slice().try_into()?)?
498 } else {
499 let pub_bytes = load_public_key(&shard_dir)?;
500 ed25519_dalek::VerifyingKey::from_bytes(pub_bytes.as_slice().try_into()?)?
501 };
502
503 let mut unsigned_commit = commit.clone();
504 unsigned_commit.signature = None;
505 let json_unsigned = metadata::serialize_for_signing(&unsigned_commit);
506
507 let sig_bytes = hex::decode(sig_hex)?;
508 let signature = ed25519_dalek::Signature::from_bytes(sig_bytes.as_slice().try_into()?);
509
510 verifying_key.verify(&json_unsigned, &signature)?;
511 sig_verified = true;
512 if !json {
513 info!("Signature verified.");
514 }
515 } else if !json {
516 info!("Warning: Commit is unsigned.");
517 }
518
519 for manifest_id in &commit.manifests {
520 let manifest_data = store.get_chunk(manifest_id)?;
521 let hash = blake3::hash(&manifest_data);
522 if hash.to_hex().to_string() != *manifest_id {
523 anyhow::bail!("manifest object hash mismatch for manifest '{}': content does not match stored hash. The object store may be corrupted.", manifest_id);
524 }
525
526 let manifest: FileManifest = metadata::deserialize(&manifest_data)?;
527
528 if let Some(sig_hex) = &manifest.signature {
530 let pk_bytes = if let Some(pk_hex) = &commit.public_key {
531 hex::decode(pk_hex)?
532 } else {
533 load_public_key(&shard_dir)?
534 };
535 let vk = ed25519_dalek::VerifyingKey::from_bytes(pk_bytes.as_slice().try_into()?)?;
536 let mut unsigned = manifest.clone();
537 unsigned.signature = None;
538 let canonical = metadata::serialize_for_signing(&unsigned);
539 let sig_bytes = hex::decode(sig_hex)?;
540 let sig = ed25519_dalek::Signature::from_bytes(sig_bytes.as_slice().try_into()?);
541 vk.verify(&canonical, &sig)?;
542 if !json {
543 info!(" Manifest signature verified for: {}", manifest.name);
544 }
545 }
546
547 let compression = manifest.compression.parse::<Compression>()?;
548 if !json {
549 info!(
550 "Verifying file: {} (compression: {})",
551 manifest.name, manifest.compression
552 );
553 }
554
555 if let Some(ref mr) = manifest.merkle_root {
557 let computed = FileManifest::merkle_root(&manifest.chunks);
558 if mr != &computed {
559 anyhow::bail!(
560 "merkle root mismatch for '{}': manifest says {} but computed {}",
561 manifest.name,
562 mr,
563 computed
564 );
565 }
566 }
567
568 for chunk_id in &manifest.chunks {
569 let chunk_data = store.get_chunk(chunk_id)?;
570 let decrypted = match &cipher {
571 Some(c) => c.decrypt(&chunk_data)?,
572 None => chunk_data,
573 };
574 let decompressed = compression.decompress(&decrypted)?;
575 let hash = blake3::hash(&decompressed);
576 if hash.to_hex().to_string() != *chunk_id {
577 anyhow::bail!("chunk hash mismatch for '{}': content does not match stored hash (expected {}, got {}). File may be corrupted.", manifest.name, chunk_id, hash.to_hex());
578 }
579 }
580 files_checked += 1;
581 }
582
583 if json {
584 info!(
585 "{}",
586 serde_json::to_string(&serde_json::json!({
587 "commit_id": commit_id,
588 "verified": true,
589 "signature_verified": sig_verified,
590 "files_checked": files_checked,
591 }))?
592 );
593 } else {
594 info!("Verification successful.");
595 }
596 Ok(())
597}
598
599fn load_commit(store: &Store, commit_id: &str) -> Result<Commit> {
600 if commit_id.len() < 2 {
601 anyhow::bail!(
602 "commit id too short (got {} chars, need at least 2): '{}'",
603 commit_id.len(),
604 commit_id
605 );
606 };
607 let data = store.get_chunk(commit_id)?;
608 let mut commit: Commit = metadata::deserialize(&data)?;
609 commit.commit_id = commit_id.to_string();
610 Ok(commit)
611}
612
613fn has_dag_cycle(store: &Store, parents: &[String], commit_id: &str) -> Result<bool> {
614 let mut seen = std::collections::HashSet::new();
615 let mut stack: Vec<String> = parents.to_vec();
616 while let Some(cid) = stack.pop() {
617 if cid == commit_id {
618 return Ok(true);
619 }
620 if !seen.insert(cid.clone()) {
621 continue;
622 }
623 if let Ok(data) = store.get_chunk(&cid) {
624 if let Ok(commit) = metadata::deserialize::<Commit>(&data) {
625 for p in &commit.parents {
626 stack.push(p.clone());
627 }
628 }
629 }
630 }
631 Ok(false)
632}
633
634#[derive(Serialize)]
635pub struct LogEntry {
636 pub commit_id: String,
637 pub parents: Vec<String>,
638 pub manifests: Vec<String>,
639 pub author: String,
640 pub message: String,
641 pub timestamp: u64,
642 pub signature: Option<String>,
643}
644
645impl From<Commit> for LogEntry {
646 fn from(c: Commit) -> Self {
647 LogEntry {
648 commit_id: c.commit_id,
649 parents: c.parents,
650 manifests: c.manifests,
651 author: c.author,
652 message: c.message,
653 timestamp: c.timestamp,
654 signature: c.signature,
655 }
656 }
657}
658
659pub fn log_cmd(path: &Path, json: bool) -> Result<()> {
660 let shard_dir = path.join(".shard");
661 if !shard_dir.exists() {
662 anyhow::bail!("not a shard repository (run `shard init` first)");
663 }
664
665 let store = Store::open(&shard_dir)?;
666
667 let (_, head_commit) = branch::resolve_head(&shard_dir)?;
668 let head = head_commit
669 .ok_or_else(|| anyhow::anyhow!("no commits yet (run `shard commit` after adding files)"))?;
670
671 let mut entries: Vec<LogEntry> = Vec::new();
672 let mut seen = std::collections::HashSet::new();
673 let mut stack = vec![head];
674
675 while let Some(cid) = stack.pop() {
676 if !seen.insert(cid.clone()) {
677 continue;
678 }
679 let commit = load_commit(&store, &cid)?;
680 for parent in &commit.parents {
681 stack.push(parent.clone());
682 }
683 entries.push(commit.into());
684 }
685
686 if json {
687 info!("{}", serde_json::to_string_pretty(&entries)?);
688 } else {
689 for entry in &entries {
690 let ts = {
691 let secs = entry.timestamp as i64;
692 let tm = time::OffsetDateTime::from_unix_timestamp(secs)
693 .unwrap_or(time::OffsetDateTime::UNIX_EPOCH);
694 tm.format(&time::format_description::well_known::Rfc3339)
695 .unwrap_or_else(|_| entry.timestamp.to_string())
696 };
697 info!("commit {}", entry.commit_id);
698 if !entry.parents.is_empty() {
699 info!("parents: {}", entry.parents.join(" "));
700 }
701 info!("author: {}", entry.author);
702 info!("date: {}", ts);
703 info!("");
704 for line in entry.message.lines() {
705 info!(" {}", line);
706 }
707 info!("");
708 }
709 }
710
711 Ok(())
712}
713
714fn reconstruct_file(
715 store: &Store,
716 manifest: &FileManifest,
717 cipher: Option<&encryption::RepoCipher>,
718) -> Result<Vec<u8>> {
719 let compression: Compression = manifest.compression.parse()?;
720 let mut data = Vec::new();
721 for chunk_id in &manifest.chunks {
722 let chunk_data = store.get_chunk(chunk_id)?;
723 let decrypted = match cipher {
724 Some(c) => c.decrypt(&chunk_data)?,
725 None => chunk_data,
726 };
727 let decompressed = compression.decompress(&decrypted)?;
728 data.extend_from_slice(&decompressed);
729 }
730 Ok(data)
731}
732
733pub fn diff(path: &Path, commit_a: &str, commit_b: Option<&str>, json: bool) -> Result<()> {
734 let shard_dir = path.join(".shard");
735 if !shard_dir.exists() {
736 anyhow::bail!("not a shard repository (run `shard init` first)");
737 }
738
739 let store = Store::open(&shard_dir)?;
740 let cipher = maybe_load_cipher(&shard_dir)?;
741
742 let cid_b = match commit_b {
743 Some(c) => branch::resolve_rev(&shard_dir, c)?,
744 None => {
745 let (_, head) = branch::resolve_head(&shard_dir)?;
746 head.ok_or_else(|| anyhow::anyhow!("no commits yet"))?
747 }
748 };
749 let cid_a = branch::resolve_rev(&shard_dir, commit_a)?;
750
751 let commit1 = load_commit(&store, &cid_a)?;
752 let commit2 = load_commit(&store, &cid_b)?;
753
754 let mut files1: HashMap<String, FileManifest> = HashMap::new();
755 for mid in &commit1.manifests {
756 let data = store.get_chunk(mid)?;
757 let m: FileManifest = metadata::deserialize(&data)?;
758 files1.insert(m.name.clone(), m);
759 }
760
761 let mut files2: HashMap<String, FileManifest> = HashMap::new();
762 for mid in &commit2.manifests {
763 let data = store.get_chunk(mid)?;
764 let m: FileManifest = metadata::deserialize(&data)?;
765 files2.insert(m.name.clone(), m);
766 }
767
768 let mut all_names: Vec<&String> = files1.keys().chain(files2.keys()).collect();
769 all_names.sort();
770 all_names.dedup();
771
772 let mut changes: Vec<serde_json::Value> = Vec::new();
773 let mut diff_found = false;
774
775 for name in all_names {
776 match (files1.get(name), files2.get(name)) {
777 (None, Some(manifest)) => {
778 let content = reconstruct_file(&store, manifest, cipher.as_ref())?;
779 let text = String::from_utf8_lossy(&content);
780 diff_found = true;
781 if json {
782 changes.push(serde_json::json!({
783 "type": "added",
784 "file": name,
785 "lines": text.lines().collect::<Vec<_>>(),
786 }));
787 } else {
788 info!("--- /dev/null");
789 info!("+++ b/{}", name);
790 let lines: Vec<&str> = text.lines().collect();
791 info!("@@ -0,0 +1,{} @@", lines.len());
792 for line in &lines {
793 info!("+{}", line);
794 }
795 }
796 }
797 (Some(manifest), None) => {
798 let content = reconstruct_file(&store, manifest, cipher.as_ref())?;
799 let text = String::from_utf8_lossy(&content);
800 diff_found = true;
801 if json {
802 changes.push(serde_json::json!({
803 "type": "removed",
804 "file": name,
805 "lines": text.lines().collect::<Vec<_>>(),
806 }));
807 } else {
808 info!("--- a/{}", name);
809 info!("+++ /dev/null");
810 let lines: Vec<&str> = text.lines().collect();
811 info!("@@ -1,{} +0,0 @@", lines.len());
812 for line in &lines {
813 info!("-{}", line);
814 }
815 }
816 }
817 (Some(ma), Some(mb)) => {
818 if ma.chunks == mb.chunks {
819 continue;
820 }
821 let content_a = reconstruct_file(&store, ma, cipher.as_ref())?;
822 let content_b = reconstruct_file(&store, mb, cipher.as_ref())?;
823 diff_found = true;
824 if json {
825 let text_a = String::from_utf8_lossy(&content_a);
826 let text_b = String::from_utf8_lossy(&content_b);
827 changes.push(serde_json::json!({
828 "type": "modified",
829 "file": name,
830 "old_lines": text_a.lines().collect::<Vec<_>>(),
831 "new_lines": text_b.lines().collect::<Vec<_>>(),
832 }));
833 } else {
834 let text_a = String::from_utf8_lossy(&content_a);
835 let text_b = String::from_utf8_lossy(&content_b);
836 let diff = TextDiff::from_lines(text_a.as_ref(), text_b.as_ref());
837 let mut buf: Vec<u8> = Vec::new();
838 diff.unified_diff()
839 .header(&format!("a/{}", name), &format!("b/{}", name))
840 .to_writer(&mut buf)
841 .map_err(|e| anyhow::anyhow!("diff output error: {}", e))?;
842 let output = String::from_utf8_lossy(&buf);
843 for line in output.lines() {
844 info!("{}", line);
845 }
846 }
847 }
848 (None, None) => {}
849 }
850 }
851
852 if !diff_found {
853 if json {
854 info!(
855 "{}",
856 serde_json::to_string(
857 &serde_json::json!({"changes": changes, "message": "no differences"})
858 )?
859 );
860 } else {
861 info!("No differences between the commits.");
862 }
863 return Ok(());
864 }
865
866 if json {
867 info!(
868 "{}",
869 serde_json::to_string(&serde_json::json!({"changes": changes}))?
870 );
871 }
872
873 Ok(())
874}
875
876pub fn checkout(path: &Path, target: &str, json: bool) -> Result<()> {
877 let shard_dir = path.join(".shard");
878 if !shard_dir.exists() {
879 anyhow::bail!("not a shard repository (run `shard init` first)");
880 }
881
882 let store = Store::open(&shard_dir)?;
883 let cipher = maybe_load_cipher(&shard_dir)?;
884
885 let branch_path = shard_dir.join("refs").join("heads").join(target);
888 let commit_id = if branch_path.exists() {
889 fs::read_to_string(&branch_path)?.trim().to_string()
890 } else {
891 target.to_string()
892 };
893
894 let commit = load_commit(&store, &commit_id)?;
896
897 if branch_path.exists() {
899 branch::set_head_branch(&shard_dir, target)?;
900 } else {
901 branch::set_head_commit(&shard_dir, target)?;
902 }
903 let mut files = Vec::new();
904
905 for manifest_id in &commit.manifests {
906 let data = store.get_chunk(manifest_id)?;
907 let hash = blake3::hash(&data);
908 if hash.to_hex().to_string() != *manifest_id {
909 anyhow::bail!("Manifest hash mismatch: {}", manifest_id);
910 }
911 let manifest: FileManifest = metadata::deserialize(&data)?;
912 let compression = manifest.compression.parse::<Compression>()?;
913 if !json {
914 info!(
915 "Checking out file: {} (compression: {})",
916 manifest.name, manifest.compression
917 );
918 }
919
920 let mut file_data = Vec::new();
921 for chunk_id in &manifest.chunks {
922 let chunk_data = store.get_chunk(chunk_id)?;
923 let decrypted = match &cipher {
924 Some(c) => c.decrypt(&chunk_data)?,
925 None => chunk_data,
926 };
927 let decompressed = compression.decompress(&decrypted)?;
928 file_data.extend_from_slice(&decompressed);
929 }
930 fs::write(path.join(&manifest.name), file_data)?;
931 if !json {
932 info!(" -> {}", manifest.name);
933 }
934 files.push(manifest.name);
935 }
936
937 if json {
938 info!(
939 "{}",
940 serde_json::to_string(&serde_json::json!({
941 "commit_id": commit_id,
942 "files": files,
943 }))?
944 );
945 } else {
946 info!("Checkout complete.");
947 }
948 Ok(())
949}
950
951pub fn status(path: &Path, json: bool) -> Result<()> {
952 let shard_dir = path.join(".shard");
953 if !shard_dir.exists() {
954 anyhow::bail!("not a shard repository (run `shard init` first)");
955 }
956
957 let config = load_config(&shard_dir)?;
958 let fmt = MetadataFormat::from_config(&config);
959
960 let (current_branch, head_commit) = branch::resolve_head(&shard_dir)?;
961 let mut commit_id: Option<String> = None;
962 if let Some(cid) = head_commit {
963 commit_id = Some(cid);
964 if !json {
965 if let Some(ref branch) = current_branch {
966 info!("On branch: {}", branch);
967 } else {
968 info!("HEAD detached at {}", commit_id.as_ref().unwrap());
969 }
970 }
971 } else if !json {
972 info!("No commits yet.");
973 }
974
975 let index = Index::load(&shard_dir.join("index"), &fmt)?;
976 let staged: Vec<String> = index.files.keys().cloned().collect();
977 if !json {
978 if staged.is_empty() {
979 info!("Nothing staged.");
980 } else {
981 info!("\nStaged files:");
982 for name in &staged {
983 info!(" {} (to be committed)", name);
984 }
985 }
986 }
987
988 let store = Store::open(&shard_dir)?;
989 let mut deleted = Vec::new();
990 let tracked_names: std::collections::HashSet<String> = if let Some(head) = &commit_id {
991 let mut names = std::collections::HashSet::new();
992 if let Ok(commit) = load_commit(&store, head) {
993 for manifest_id in &commit.manifests {
994 if let Ok(data) = store.get_chunk(manifest_id) {
995 if let Ok(manifest) = metadata::deserialize::<FileManifest>(&data) {
996 let file_path = path.join(&manifest.name);
997 if !file_path.exists() {
998 deleted.push(manifest.name.clone());
999 }
1000 names.insert(manifest.name);
1001 }
1002 }
1003 }
1004 }
1005 names
1006 } else {
1007 std::collections::HashSet::new()
1008 };
1009
1010 if !json && !deleted.is_empty() {
1011 info!("\nDeleted files:");
1012 for name in &deleted {
1013 info!(" {} (deleted)", name);
1014 }
1015 }
1016
1017 let mut untracked = Vec::new();
1018 for entry in walkdir::WalkDir::new(path)
1019 .min_depth(1)
1020 .into_iter()
1021 .filter_map(|e| e.ok())
1022 {
1023 if entry.file_type().is_file() || entry.file_type().is_dir() {
1024 let rel_path = entry.path().strip_prefix(path).unwrap_or(entry.path());
1025 let name = rel_path.to_string_lossy().to_string();
1026 let is_hidden = rel_path
1027 .components()
1028 .any(|c| c.as_os_str().to_string_lossy().starts_with('.'));
1029 if !is_hidden
1030 && entry.path() != shard_dir
1031 && !entry.path().starts_with(&shard_dir)
1032 && !index.files.contains_key(&name)
1033 && !tracked_names.contains(&name)
1034 && entry.file_type().is_file()
1035 {
1036 untracked.push(name);
1037 }
1038 }
1039 }
1040 if !json && !untracked.is_empty() {
1041 info!("\nUntracked files:");
1042 for name in &untracked {
1043 info!(" {}", name);
1044 }
1045 }
1046
1047 if json {
1048 info!(
1049 "{}",
1050 serde_json::to_string(&serde_json::json!({
1051 "commit": commit_id,
1052 "staged": staged,
1053 "deleted": deleted,
1054 "untracked": untracked,
1055 }))?
1056 );
1057 }
1058
1059 Ok(())
1060}
1061
1062fn load_config(shard_dir: &Path) -> Result<std::collections::BTreeMap<String, String>> {
1063 let config_path = shard_dir.join("config.json");
1064 if config_path.exists() {
1065 let data = fs::read(&config_path)?;
1066 Ok(metadata::deserialize(&data)?)
1067 } else {
1068 Ok(std::collections::BTreeMap::new())
1069 }
1070}
1071
1072fn save_config(
1073 shard_dir: &Path,
1074 config: &std::collections::BTreeMap<String, String>,
1075) -> Result<()> {
1076 let fmt = MetadataFormat::from_config(config);
1077 let data = metadata::serialize(config, &fmt);
1078 fs::write(shard_dir.join("config.json"), data)?;
1079 Ok(())
1080}
1081
1082fn global_keys_dir() -> Option<PathBuf> {
1083 std::env::var("HOME")
1084 .ok()
1085 .map(|h| Path::new(&h).join(".shard").join("keys"))
1086}
1087
1088fn load_keypair(shard_dir: &Path) -> Result<KeyPair> {
1089 let local = shard_dir.join("keys");
1090 if local.join("secret.key").exists() {
1091 return KeyPair::load(&local);
1092 }
1093 if let Some(global) = global_keys_dir() {
1094 if global.join("secret.key").exists() {
1095 return KeyPair::load(&global);
1096 }
1097 }
1098 anyhow::bail!("no keypair found in {} or ~/.shard/keys/", local.display())
1099}
1100
1101fn load_public_key(shard_dir: &Path) -> Result<Vec<u8>> {
1102 let local = shard_dir.join("keys/public.key");
1103 if local.exists() {
1104 return Ok(fs::read(&local)?);
1105 }
1106 if let Some(global) = global_keys_dir() {
1107 let gp = global.join("public.key");
1108 if gp.exists() {
1109 return Ok(fs::read(&gp)?);
1110 }
1111 }
1112 anyhow::bail!(
1113 "no public key found in {} or ~/.shard/keys/",
1114 local.display()
1115 )
1116}
1117
1118fn maybe_load_cipher(shard_dir: &Path) -> Result<Option<encryption::RepoCipher>> {
1119 let config = load_config(shard_dir)?;
1120 if config.get("private").map(|s| s.as_str()) == Some("true") {
1121 let key = encryption::load_repo_key(&shard_dir.join("keys"))?;
1122 Ok(Some(encryption::RepoCipher::from_key(&key)))
1123 } else {
1124 Ok(None)
1125 }
1126}
1127
1128pub fn config_get(path: &Path, key: Option<&str>) -> Result<()> {
1129 let shard_dir = path.join(".shard");
1130 if !shard_dir.exists() {
1131 anyhow::bail!("not a shard repository (run `shard init` first)");
1132 }
1133 let config = load_config(&shard_dir)?;
1134 if let Some(key) = key {
1135 match config.get(key) {
1136 Some(value) => info!("{} = {}", key, value),
1137 None => anyhow::bail!("config key not found: {}", key),
1138 }
1139 } else {
1140 for (k, v) in &config {
1141 info!("{} = {}", k, v);
1142 }
1143 }
1144 Ok(())
1145}
1146
1147pub fn config_set(path: &Path, key: &str, value: &str) -> Result<()> {
1148 let shard_dir = path.join(".shard");
1149 if !shard_dir.exists() {
1150 anyhow::bail!("not a shard repository (run `shard init` first)");
1151 }
1152 let mut config = load_config(&shard_dir)?;
1153 config.insert(key.to_string(), value.to_string());
1154 save_config(&shard_dir, &config)?;
1155 info!("{} = {}", key, value);
1156 Ok(())
1157}
1158
1159fn load_tags(shard_dir: &Path) -> Result<std::collections::BTreeMap<String, String>> {
1160 let tags_path = shard_dir.join("tags.json");
1161 if tags_path.exists() {
1162 let data = fs::read(&tags_path)?;
1163 Ok(serde_json::from_slice(&data)?)
1164 } else {
1165 Ok(std::collections::BTreeMap::new())
1166 }
1167}
1168
1169fn save_tags(shard_dir: &Path, tags: &std::collections::BTreeMap<String, String>) -> Result<()> {
1170 let data = serde_json::to_string_pretty(tags)?;
1171 fs::write(shard_dir.join("tags.json"), data)?;
1172 Ok(())
1173}
1174
1175pub fn tag_add(path: &Path, name: &str, commit_id: &str) -> Result<()> {
1176 let shard_dir = path.join(".shard");
1177 if !shard_dir.exists() {
1178 anyhow::bail!("not a shard repository (run `shard init` first)");
1179 }
1180 let store = Store::open(&shard_dir)?;
1182 load_commit(&store, commit_id)?;
1183 let mut tags = load_tags(&shard_dir)?;
1184 tags.insert(name.to_string(), commit_id.to_string());
1185 save_tags(&shard_dir, &tags)?;
1186 info!("Tagged '{}' -> {}", name, commit_id);
1187 Ok(())
1188}
1189
1190pub fn branch_create(path: &Path, name: &str, commit_id: Option<&str>) -> Result<()> {
1191 let shard_dir = path.join(".shard");
1192 if !shard_dir.exists() {
1193 anyhow::bail!("not a shard repository (run `shard init` first)");
1194 }
1195 let id = match commit_id {
1196 Some(cid) => cid.to_string(),
1197 None => {
1198 let (_, head) = branch::resolve_head(&shard_dir)?;
1199 head.ok_or_else(|| anyhow::anyhow!("No commits yet — cannot create branch"))?
1200 }
1201 };
1202 let store = Store::open(&shard_dir)?;
1204 load_commit(&store, &id)?;
1205 branch::create_branch(&shard_dir, name, &id)
1206}
1207
1208pub fn branch_delete(path: &Path, name: &str) -> Result<()> {
1209 let shard_dir = path.join(".shard");
1210 if !shard_dir.exists() {
1211 anyhow::bail!("not a shard repository (run `shard init` first)");
1212 }
1213 branch::delete_branch(&shard_dir, name)
1214}
1215
1216pub fn branch_list(path: &Path) -> Result<()> {
1217 let shard_dir = path.join(".shard");
1218 if !shard_dir.exists() {
1219 anyhow::bail!("not a shard repository (run `shard init` first)");
1220 }
1221 let (current, branches) = branch::list_branches(&shard_dir)?;
1222 if branches.is_empty() {
1223 info!("No branches.");
1224 return Ok(());
1225 }
1226 for (name, commit_id) in &branches {
1227 let prefix = if current.as_deref() == Some(name) {
1228 "* "
1229 } else {
1230 " "
1231 };
1232 info!(
1233 "{}{} ({})",
1234 prefix,
1235 name,
1236 &commit_id[..8.min(commit_id.len())]
1237 );
1238 }
1239 Ok(())
1240}
1241
1242pub fn merge(path: &Path, branch: &str, message: &str, author: &str, json: bool) -> Result<()> {
1243 let shard_dir = path.join(".shard");
1244 if !shard_dir.exists() {
1245 anyhow::bail!("not a shard repository (run `shard init` first)");
1246 }
1247
1248 let store = Store::open(&shard_dir)?;
1249
1250 let config = load_config(&shard_dir)?;
1251 let fmt = MetadataFormat::from_config(&config);
1252
1253 let (current_branch, current_id) = branch::resolve_head(&shard_dir)?;
1255 let current_id =
1256 current_id.ok_or_else(|| anyhow::anyhow!("No commits yet — nothing to merge into"))?;
1257
1258 let source_id = branch::resolve_rev(&shard_dir, branch)?;
1260 if source_id == current_id {
1261 anyhow::bail!("Already up to date — source is the same commit as HEAD");
1262 }
1263
1264 let current_commit = load_commit(&store, ¤t_id)?;
1266 let source_commit = load_commit(&store, &source_id)?;
1267
1268 let mut merged_manifests: std::collections::HashMap<String, (String, Vec<String>)> =
1270 std::collections::HashMap::new();
1271
1272 for manifest_id in ¤t_commit.manifests {
1273 let data = store.get_chunk(manifest_id)?;
1274 let manifest: FileManifest = metadata::deserialize(&data)?;
1275 merged_manifests.insert(manifest.name.clone(), (manifest.name, manifest.chunks));
1276 }
1277
1278 for manifest_id in &source_commit.manifests {
1279 let data = store.get_chunk(manifest_id)?;
1280 let manifest: FileManifest = metadata::deserialize(&data)?;
1281 merged_manifests.insert(manifest.name.clone(), (manifest.name, manifest.chunks));
1282 }
1283
1284 let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
1286 let keys = load_keypair(&shard_dir)?;
1287 let signing_key = keys.signing_key;
1288 let mut merged_manifest_ids = Vec::new();
1289 for (name, chunks) in merged_manifests.values() {
1290 let compression = Compression::None;
1291 let mut manifest = FileManifest {
1292 name: name.clone(),
1293 size: 0,
1294 chunks: chunks.clone(),
1295 content_type: None,
1296 compression: compression.as_str().to_string(),
1297 merkle_root: Some(FileManifest::merkle_root(chunks)),
1298 created_by: Some(author.to_string()),
1299 created_at: Some(timestamp),
1300 signature: None,
1301 };
1302
1303 let mut unsigned = manifest.clone();
1304 unsigned.signature = None;
1305 let canonical = metadata::serialize_for_signing(&unsigned);
1306 let sig = signing_key.sign(&canonical);
1307 manifest.signature = Some(hex::encode(sig.to_bytes()));
1308
1309 let encoded = metadata::serialize(&manifest, &fmt);
1310 let hash = blake3::hash(&encoded);
1311 store.put_chunk(&crate::chunker::Chunk {
1312 hash,
1313 data: encoded,
1314 offset: 0,
1315 })?;
1316 merged_manifest_ids.push(hash.to_hex().to_string());
1317 }
1318 merged_manifest_ids.sort();
1319
1320 let public_key_hex = hex::encode(keys.verifying_key.to_bytes());
1322 let key_id = keychain::get_current_key_id(&shard_dir.join("keys")).ok();
1323 let parents = vec![current_id.clone(), source_id.clone()];
1324 let mut commit = Commit {
1325 commit_id: String::new(),
1326 parents,
1327 manifests: merged_manifest_ids,
1328 author: author.to_string(),
1329 message: message.to_string(),
1330 timestamp,
1331 public_key: Some(public_key_hex),
1332 signature: None,
1333 key_id,
1334 };
1335
1336 let json_unsigned = metadata::serialize_for_signing(&commit);
1337 let signature = signing_key.sign(&json_unsigned);
1338 commit.signature = Some(hex::encode(signature.to_bytes()));
1339
1340 let encoded = metadata::serialize(&commit, &fmt);
1341 let hash = blake3::hash(&encoded);
1342 store.put_chunk(&crate::chunker::Chunk {
1343 hash,
1344 data: encoded,
1345 offset: 0,
1346 })?;
1347
1348 let merge_commit_id = hash.to_hex().to_string();
1349
1350 if has_dag_cycle(&store, &commit.parents, &merge_commit_id)? {
1352 anyhow::bail!(
1353 "Cycle detected in merge: commit {} is already an ancestor of one or more parents",
1354 merge_commit_id
1355 );
1356 }
1357
1358 if let Some(ref branch_name) = current_branch {
1360 branch::update_branch_ref(&shard_dir, branch_name, &merge_commit_id)?;
1361 branch::set_head_branch(&shard_dir, branch_name)?;
1362 } else {
1363 branch::set_head_commit(&shard_dir, &merge_commit_id)?;
1364 }
1365
1366 if json {
1367 info!(
1368 "{}",
1369 serde_json::to_string(&serde_json::json!({
1370 "commit_id": merge_commit_id,
1371 "message": message,
1372 }))?
1373 );
1374 } else {
1375 info!("Merge commit {} ({})", merge_commit_id, message);
1376 }
1377 Ok(())
1378}
1379
1380pub fn tag_list(path: &Path) -> Result<()> {
1381 let shard_dir = path.join(".shard");
1382 if !shard_dir.exists() {
1383 anyhow::bail!("not a shard repository (run `shard init` first)");
1384 }
1385 let tags = load_tags(&shard_dir)?;
1386 if tags.is_empty() {
1387 info!("No tags.");
1388 } else {
1389 for (name, commit_id) in &tags {
1390 info!("{} -> {}", name, commit_id);
1391 }
1392 }
1393 Ok(())
1394}
1395
1396fn collect_reachable(
1397 store: &Store,
1398 commit_id: &str,
1399 seen_commits: &mut std::collections::HashSet<String>,
1400 reachable: &mut std::collections::HashSet<String>,
1401) -> Result<()> {
1402 if !seen_commits.insert(commit_id.to_string()) {
1403 return Ok(());
1404 }
1405
1406 reachable.insert(commit_id.to_string());
1407
1408 let commit = match load_commit(store, commit_id) {
1409 Ok(c) => c,
1410 Err(_) => return Ok(()),
1411 };
1412
1413 for manifest_id in &commit.manifests {
1414 reachable.insert(manifest_id.clone());
1415
1416 if let Ok(data) = store.get_chunk(manifest_id) {
1417 if let Ok(manifest) = metadata::deserialize::<FileManifest>(&data) {
1418 for chunk_id in &manifest.chunks {
1419 reachable.insert(chunk_id.clone());
1420 }
1421 }
1422 }
1423 }
1424
1425 for parent_id in &commit.parents {
1426 collect_reachable(store, parent_id, seen_commits, reachable)?;
1427 }
1428
1429 Ok(())
1430}
1431
1432pub fn prune(path: &Path, json: bool) -> Result<()> {
1433 let shard_dir = path.join(".shard");
1434 if !shard_dir.exists() {
1435 anyhow::bail!("not a shard repository (run `shard init` first)");
1436 }
1437
1438 let config = load_config(&shard_dir)?;
1439 let fmt = MetadataFormat::from_config(&config);
1440
1441 let store = Store::open(&shard_dir)?;
1442 let mut reachable: std::collections::HashSet<String> = std::collections::HashSet::new();
1443
1444 let (_, head_commit) = branch::resolve_head(&shard_dir)?;
1446 if let Some(ref head) = head_commit {
1447 collect_reachable(
1448 &store,
1449 head,
1450 &mut std::collections::HashSet::new(),
1451 &mut reachable,
1452 )?;
1453 }
1454
1455 if let Ok(branches) = branch::list_branches(&shard_dir) {
1457 for (_, commit_id) in branches.1 {
1458 collect_reachable(
1459 &store,
1460 &commit_id,
1461 &mut std::collections::HashSet::new(),
1462 &mut reachable,
1463 )?;
1464 }
1465 }
1466
1467 let tags = load_tags(&shard_dir)?;
1469 for commit_id in tags.values() {
1470 collect_reachable(
1471 &store,
1472 commit_id,
1473 &mut std::collections::HashSet::new(),
1474 &mut reachable,
1475 )?;
1476 }
1477
1478 let index = Index::load(&shard_dir.join("index"), &fmt)?;
1480 for manifest in index.files.values() {
1481 let json = metadata::serialize(manifest, &fmt);
1482 let hash = blake3::hash(&json);
1483 let hash_hex = hash.to_hex().to_string();
1484 reachable.insert(hash_hex);
1485 for chunk_hash in &manifest.chunks {
1486 reachable.insert(chunk_hash.clone());
1487 }
1488 }
1489
1490 let all_chunks = store.iter_chunks()?;
1492 let mut pruned = 0u64;
1493 let mut kept = 0u64;
1494 for (hash_hex, full_path) in &all_chunks {
1495 if !reachable.contains(hash_hex) {
1496 store.delete_chunk(hash_hex, Some(full_path))?;
1497 pruned += 1;
1498 } else {
1499 kept += 1;
1500 }
1501 }
1502
1503 if json {
1504 info!(
1505 "{}",
1506 serde_json::to_string(&serde_json::json!({
1507 "pruned": pruned,
1508 "remaining": kept,
1509 }))?
1510 );
1511 } else {
1512 info!("Pruned {} objects. {} objects remain.", pruned, kept);
1513 }
1514 Ok(())
1515}
1516
1517pub fn peer_add(path: &Path, multiaddr: &str, json: bool) -> Result<()> {
1518 let shard_dir = path.join(".shard");
1519 if !shard_dir.exists() {
1520 anyhow::bail!("not a shard repository (run `shard init` first)");
1521 }
1522
1523 if multiaddr.is_empty() || multiaddr.parse::<shard_net::libp2p::Multiaddr>().is_err() {
1525 anyhow::bail!("invalid multiaddr '{}' (must be a valid libp2p multiaddr, e.g. /ip4/1.2.3.4/tcp/5678/p2p/...)", multiaddr);
1526 }
1527
1528 let peers_path = shard_dir.join("peers.json");
1529 let mut peers: Vec<String> = if peers_path.exists() {
1530 let data = fs::read(&peers_path)?;
1531 serde_json::from_slice(&data)?
1532 } else {
1533 Vec::new()
1534 };
1535
1536 let added = if !peers.contains(&multiaddr.to_string()) {
1537 peers.push(multiaddr.to_string());
1538 let data = serde_json::to_vec(&peers)?;
1539 fs::write(peers_path, data)?;
1540 true
1541 } else {
1542 false
1543 };
1544
1545 if json {
1546 info!(
1547 "{}",
1548 serde_json::to_string(&serde_json::json!({
1549 "multiaddr": multiaddr,
1550 "added": added,
1551 }))?
1552 );
1553 } else if added {
1554 info!("Added peer: {}", multiaddr);
1555 } else {
1556 info!("Peer already exists: {}", multiaddr);
1557 }
1558
1559 Ok(())
1560}
1561
1562fn load_peers(shard_dir: &Path) -> Result<Vec<String>> {
1563 let peers_path = shard_dir.join("peers.json");
1564 if peers_path.exists() {
1565 let data = fs::read(peers_path)?;
1566 Ok(serde_json::from_slice(&data)?)
1567 } else {
1568 Ok(Vec::new())
1569 }
1570}
1571
1572fn authorized_keys_path(shard_dir: &Path) -> std::path::PathBuf {
1573 shard_dir.join("authorized_keys")
1574}
1575
1576fn load_authorized_keys(shard_dir: &Path) -> Result<Vec<ed25519_dalek::VerifyingKey>> {
1577 let path = authorized_keys_path(shard_dir);
1578 if !path.exists() {
1579 return Ok(Vec::new());
1580 }
1581 let content = fs::read_to_string(&path)?;
1582 let mut keys = Vec::new();
1583 for line in content.lines() {
1584 let line = line.trim();
1585 if line.is_empty() || line.starts_with('#') {
1586 continue;
1587 }
1588 let bytes = hex::decode(line)?;
1589 let arr: [u8; 32] = bytes
1590 .as_slice()
1591 .try_into()
1592 .map_err(|_| anyhow::anyhow!("Invalid public key length in authorized_keys"))?;
1593 keys.push(ed25519_dalek::VerifyingKey::from_bytes(&arr)?);
1594 }
1595 Ok(keys)
1596}
1597
1598pub fn add_authorized_key(shard_dir: &Path, public_key_hex: &str) -> Result<()> {
1599 let bytes = hex::decode(public_key_hex)?;
1601 let arr: [u8; 32] = bytes
1602 .as_slice()
1603 .try_into()
1604 .map_err(|_| anyhow::anyhow!("Public key must be 32 bytes (64 hex chars)"))?;
1605 let _pk = ed25519_dalek::VerifyingKey::from_bytes(&arr)?;
1606
1607 let path = authorized_keys_path(shard_dir);
1608 let mut content = if path.exists() {
1609 fs::read_to_string(&path)?
1610 } else {
1611 String::new()
1612 };
1613 if content.lines().any(|l| l.trim() == public_key_hex) {
1615 info!("Key already authorized");
1616 return Ok(());
1617 }
1618 content.push_str(public_key_hex);
1619 content.push('\n');
1620 fs::write(&path, content)?;
1621 info!("Authorized key added");
1622 Ok(())
1623}
1624
1625pub fn backup(path: &Path, output: &Path, json: bool) -> Result<()> {
1626 let shard_dir = path.join(".shard");
1627 if !shard_dir.exists() {
1628 anyhow::bail!("not a shard repository (run `shard init` first)");
1629 }
1630 let file = fs::File::create(output)?;
1631 let encoder = flate2::write::GzEncoder::new(file, flate2::Compression::default());
1632 let mut archive = tar::Builder::new(encoder);
1633 archive.append_dir_all(".shard", &shard_dir)?;
1634 archive.finish()?;
1635 if json {
1636 info!(
1637 "{}",
1638 serde_json::to_string(&serde_json::json!({
1639 "path": output.to_string_lossy(),
1640 }))?
1641 );
1642 } else {
1643 info!("Backup created: {}", output.display());
1644 }
1645 Ok(())
1646}
1647
1648pub fn export(path: &Path, commit_id: &str, output_dir: &Path, json: bool) -> Result<()> {
1649 let shard_dir = path.join(".shard");
1650 if !shard_dir.exists() {
1651 anyhow::bail!("not a shard repository (run `shard init` first)");
1652 }
1653 let store = Store::open(&shard_dir)?;
1654 let cipher = maybe_load_cipher(&shard_dir)?;
1655 let commit = load_commit(&store, commit_id)?;
1656 let mut files = Vec::new();
1657 for manifest_id in &commit.manifests {
1658 let data = store.get_chunk(manifest_id)?;
1659 let manifest: FileManifest = metadata::deserialize(&data)?;
1660 let compression = manifest.compression.parse::<Compression>()?;
1661 if !json {
1662 info!("Exporting file: {}", manifest.name);
1663 }
1664 let mut file_data = Vec::new();
1665 for chunk_id in &manifest.chunks {
1666 let chunk_data = store.get_chunk(chunk_id)?;
1667 let decrypted = match &cipher {
1668 Some(c) => c.decrypt(&chunk_data)?,
1669 None => chunk_data,
1670 };
1671 let decompressed = compression.decompress(&decrypted)?;
1672 file_data.extend_from_slice(&decompressed);
1673 }
1674 let out_path = output_dir.join(&manifest.name);
1675 if let Some(parent) = out_path.parent() {
1676 fs::create_dir_all(parent)?;
1677 }
1678 fs::write(&out_path, file_data)?;
1679 if !json {
1680 info!(" -> {}", out_path.display());
1681 }
1682 files.push(manifest.name);
1683 }
1684 if json {
1685 info!(
1686 "{}",
1687 serde_json::to_string(&serde_json::json!({
1688 "commit_id": commit_id,
1689 "files": files,
1690 "output_dir": output_dir.to_string_lossy(),
1691 }))?
1692 );
1693 } else {
1694 info!("Export complete.");
1695 }
1696 Ok(())
1697}
1698
1699pub fn import(
1700 path: &Path,
1701 source_dir: &Path,
1702 message: &str,
1703 author: &str,
1704 json: bool,
1705) -> Result<()> {
1706 let shard_dir = path.join(".shard");
1707 if !shard_dir.exists() {
1708 anyhow::bail!("not a shard repository (run `shard init` first)");
1709 }
1710 let config = load_config(&shard_dir)?;
1712 let compression: Compression = config
1713 .get("compression")
1714 .map(|s| s.as_str())
1715 .unwrap_or("zstd")
1716 .parse()?;
1717 let chunker_mode = chunker::ChunkerMode::from_config(&config);
1718 let fmt = MetadataFormat::from_config(&config);
1719 let store = Store::open(&shard_dir)?;
1720 let cipher = maybe_load_cipher(&shard_dir)?;
1721 let mut index = Index::load(&shard_dir.join("index"), &fmt)?;
1722 if !source_dir.is_dir() {
1723 anyhow::bail!("Source must be a directory");
1724 }
1725 for entry in walkdir::WalkDir::new(source_dir)
1726 .into_iter()
1727 .filter_entry(|e| {
1728 e.file_name()
1729 .to_str()
1730 .map(|s| !s.starts_with('.'))
1731 .unwrap_or(false)
1732 })
1733 {
1734 let entry = entry?;
1735 if entry.file_type().is_file() {
1736 add_file(
1737 path,
1738 entry.path(),
1739 &store,
1740 &mut index,
1741 &compression,
1742 &chunker_mode,
1743 cipher.as_ref(),
1744 json,
1745 )?;
1746 }
1747 }
1748 index.save(&shard_dir.join("index"), &fmt)?;
1749 if !index.files.is_empty() {
1751 commit(path, message, author, json)?;
1752 } else if json {
1753 info!(
1754 "{}",
1755 serde_json::to_string(&serde_json::json!({"status": "no files found"}))?
1756 );
1757 } else {
1758 info!("No files found to import.");
1759 }
1760 Ok(())
1761}
1762
1763pub fn restore(path: &Path, backup_file: &Path, json: bool) -> Result<()> {
1764 let shard_dir = path.join(".shard");
1765 if shard_dir.exists() {
1766 anyhow::bail!(
1767 "Repository already exists — remove .shard first or use a different directory"
1768 );
1769 }
1770 let file = fs::File::open(backup_file)?;
1771 let decoder = flate2::read::GzDecoder::new(file);
1772 let mut archive = tar::Archive::new(decoder);
1773 archive.unpack(path)?;
1774 if !path.join(".shard").exists() {
1776 anyhow::bail!("Backup does not contain a valid .shard directory");
1777 }
1778 if json {
1779 info!(
1780 "{}",
1781 serde_json::to_string(&serde_json::json!({
1782 "backup": backup_file.to_string_lossy(),
1783 }))?
1784 );
1785 } else {
1786 info!("Restored from {}", backup_file.display());
1787 }
1788 Ok(())
1789}
1790
1791struct RepoProvider {
1792 store: Store,
1793 shard_dir: std::path::PathBuf,
1794}
1795
1796impl shard_net::p2p::ShardContentProvider for RepoProvider {
1797 fn get_manifest(&self, id: &str) -> Option<Vec<u8>> {
1798 self.store.get_chunk(id).ok()
1799 }
1800 fn get_chunk(&self, id: &str) -> Option<Vec<u8>> {
1801 self.store.get_chunk(id).ok()
1802 }
1803 fn put_chunk(&mut self, id: &str, data: &[u8]) -> bool {
1804 let hash = blake3::hash(data);
1805 let hex = hash.to_hex().to_string();
1806 if hex != id {
1807 return false;
1808 }
1809 self.store
1810 .put_chunk(&crate::chunker::Chunk {
1811 hash,
1812 data: data.to_vec(),
1813 offset: 0,
1814 })
1815 .is_ok()
1816 }
1817 fn verify_auth(&self, public_key: &[u8], nonce: &[u8], signature: &[u8]) -> bool {
1818 use ed25519_dalek::Verifier;
1819 let pk_bytes: [u8; 32] = match public_key.try_into() {
1820 Ok(b) => b,
1821 Err(_) => return false,
1822 };
1823 let pk = match ed25519_dalek::VerifyingKey::from_bytes(&pk_bytes) {
1824 Ok(k) => k,
1825 Err(_) => return false,
1826 };
1827 let sig_bytes: [u8; 64] = match signature.try_into() {
1828 Ok(b) => b,
1829 Err(_) => return false,
1830 };
1831 let sig = ed25519_dalek::Signature::from_bytes(&sig_bytes);
1832 if pk.verify(nonce, &sig).is_err() {
1833 return false;
1834 }
1835 if let Ok(keys) = load_authorized_keys(&self.shard_dir) {
1837 if !keys.is_empty() {
1838 return keys.contains(&pk);
1839 }
1840 }
1841 true
1842 }
1843 fn repo_public_key(&self) -> Option<Vec<u8>> {
1844 let keys = load_keypair(&self.shard_dir).ok()?;
1845 Some(keys.verifying_key.to_bytes().to_vec())
1846 }
1847}
1848
1849fn commit_stats(shard_dir: &Path, commit_id: &str) -> Result<(u64, u64)> {
1851 let store = Store::open(shard_dir)?;
1852 let commit = load_commit(&store, commit_id)?;
1853 let mut file_count = 0u64;
1854 let mut total_size = 0u64;
1855 for mid in &commit.manifests {
1856 if let Ok(data) = store.get_chunk(mid) {
1857 if let Ok(m) = metadata::deserialize::<FileManifest>(&data) {
1858 file_count += 1;
1859 total_size += m.size;
1860 }
1861 }
1862 }
1863 Ok((file_count, total_size))
1864}
1865
1866pub async fn share(path: &Path, json: bool) -> Result<()> {
1867 let shard_dir = path.join(".shard");
1868 if !shard_dir.exists() {
1869 anyhow::bail!("not a shard repository (run `shard init` first)");
1870 }
1871
1872 let mut node = shard_net::p2p::Node::new().await?;
1873
1874 let ann_topic = shard_net::libp2p::gossipsub::IdentTopic::new("shard:ann");
1876 node.subscribe(&ann_topic)?;
1877
1878 let peers = load_peers(&shard_dir)?;
1880 for peer in peers {
1881 if let Ok(addr) = peer.parse::<shard_net::libp2p::Multiaddr>() {
1882 let _ = node.swarm.dial(addr);
1883 }
1884 }
1885
1886 node.swarm.behaviour_mut().kademlia.bootstrap().ok();
1888
1889 node.listen("/ip4/0.0.0.0/tcp/0").await?;
1890 let listen_addrs: Vec<String> = node.swarm.listeners().map(|a| a.to_string()).collect();
1891 let peer_multiaddr = listen_addrs.first().cloned().unwrap_or_default();
1892
1893 let config = load_config(&shard_dir)?;
1894 let repo_name = config.get("repo_name").cloned().unwrap_or_default();
1895
1896 let store = Store::open(&shard_dir)?;
1897 let provider = RepoProvider {
1898 store,
1899 shard_dir: shard_dir.clone(),
1900 };
1901
1902 if let Some(head) = branch::resolve_head(&shard_dir)?.1 {
1904 let (file_count, total_size) = commit_stats(&shard_dir, &head)?;
1905 let ann = shard_net::protocol::Announcement {
1906 commit_id: head,
1907 file_count,
1908 total_size,
1909 repo_name: repo_name.clone(),
1910 peer_multiaddr: format!("{}/p2p/{}", peer_multiaddr, node.local_peer_id()),
1911 };
1912 let payload = serde_json::to_vec(&ann)?;
1913 let _ = node.publish(&ann_topic, payload);
1914 }
1915
1916 if json {
1917 info!(
1918 "{}",
1919 serde_json::to_string(&serde_json::json!({
1920 "status": "sharing",
1921 "peer_id": node.local_peer_id().to_string(),
1922 }))?
1923 );
1924 } else {
1925 info!("Sharing repository...");
1926 }
1927 node.run(provider).await;
1928
1929 Ok(())
1930}
1931
1932pub async fn relay(listen_addr: &str, json: bool) -> Result<()> {
1935 let mut node = shard_net::p2p::Node::new().await?;
1936 node.listen(listen_addr).await?;
1937 if json {
1938 info!(
1939 "{}",
1940 serde_json::to_string(&serde_json::json!({
1941 "status": "relay active",
1942 "listen": listen_addr,
1943 "peer_id": node.local_peer_id().to_string(),
1944 }))?
1945 );
1946 } else {
1947 info!("Relay server active on {}", listen_addr);
1948 info!("Peer ID: {}", node.local_peer_id());
1949 info!("Ready to accept circuit relay v2 reservations");
1950 }
1951 node.run(EmptyProvider).await;
1952 Ok(())
1953}
1954
1955struct EmptyProvider;
1957impl shard_net::p2p::ShardContentProvider for EmptyProvider {
1958 fn get_manifest(&self, _id: &str) -> Option<Vec<u8>> {
1959 None
1960 }
1961 fn get_chunk(&self, _id: &str) -> Option<Vec<u8>> {
1962 None
1963 }
1964 fn put_chunk(&mut self, _id: &str, _data: &[u8]) -> bool {
1965 false
1966 }
1967 fn verify_auth(&self, _public_key: &[u8], _nonce: &[u8], _signature: &[u8]) -> bool {
1968 false
1969 }
1970 fn repo_public_key(&self) -> Option<Vec<u8>> {
1971 None
1972 }
1973}
1974
1975pub async fn sync(path: &Path, _json: bool) -> Result<()> {
1976 let shard_dir = path.join(".shard");
1977 if !shard_dir.exists() {
1978 anyhow::bail!("not a shard repository (run `shard init` first)");
1979 }
1980
1981 let config = load_config(&shard_dir)?;
1982 let repo_id = config.get("repo_id").cloned().unwrap_or_default();
1983 let repo_name = config.get("repo_name").cloned().unwrap_or_default();
1984 let ann_topic = shard_net::libp2p::gossipsub::IdentTopic::new("shard:ann");
1985 let repo_topic =
1986 shard_net::libp2p::gossipsub::IdentTopic::new(format!("/shard/repo/{}", repo_id));
1987
1988 let mut node = shard_net::p2p::Node::new().await?;
1989 node.subscribe(&ann_topic)?;
1990 node.subscribe(&repo_topic)?;
1991 node.listen("/ip4/0.0.0.0/tcp/0").await?;
1992 let listen_addrs: Vec<String> = node.swarm.listeners().map(|a| a.to_string()).collect();
1993 let peer_multiaddr = listen_addrs.first().cloned().unwrap_or_default();
1994
1995 let peers = load_peers(&shard_dir)?;
1997 for peer in peers {
1998 if let Ok(addr) = peer.parse::<shard_net::libp2p::Multiaddr>() {
1999 let _ = node.swarm.dial(addr);
2000 }
2001 }
2002
2003 node.swarm.behaviour_mut().kademlia.bootstrap().ok();
2005
2006 let publish_ann = |node: &mut shard_net::p2p::Node, ann: &shard_net::protocol::Announcement| {
2008 if let Ok(payload) = serde_json::to_vec(ann) {
2009 let _ = node.publish(&ann_topic, payload.clone());
2010 let _ = node.publish(&repo_topic, payload);
2011 }
2012 };
2013
2014 let head_commit = branch::resolve_head(&shard_dir)?.1;
2015
2016 if let Some(ref head) = head_commit {
2018 let (file_count, total_size) = commit_stats(&shard_dir, head)?;
2019 let ann = shard_net::protocol::Announcement {
2020 commit_id: head.clone(),
2021 file_count,
2022 total_size,
2023 repo_name: repo_name.clone(),
2024 peer_multiaddr: format!("{}/p2p/{}", peer_multiaddr, node.local_peer_id()),
2025 };
2026 publish_ann(&mut node, &ann);
2027 if !_json {
2028 info!("Announced commit {} on sync topic", head)
2029 }
2030 } else if !_json {
2031 info!("No commits to announce");
2032 }
2033
2034 if !_json {
2035 info!("Syncing on topic with peer id: {}", node.local_peer_id());
2036 }
2037 let _ = std::io::stdout().flush();
2038
2039 let store = Store::open(&shard_dir)?;
2040 let mut provider = RepoProvider {
2041 store,
2042 shard_dir: shard_dir.clone(),
2043 };
2044
2045 let mut interval = tokio::time::interval(Duration::from_secs(5));
2046 let mut address_book: HashMap<shard_net::libp2p::PeerId, Vec<shard_net::libp2p::Multiaddr>> =
2047 HashMap::new();
2048 let mut announce_counts: HashMap<(shard_net::libp2p::PeerId, String), u32> = HashMap::new();
2049 let mut request_counts: HashMap<shard_net::libp2p::PeerId, u32> = HashMap::new();
2050 let path_buf = path.to_path_buf();
2051
2052 loop {
2053 tokio::select! {
2054 _ = tokio::signal::ctrl_c() => {
2055 info!("\nSync shutting down...");
2056 break Ok(());
2057 }
2058 _ = interval.tick() => {
2059 request_counts.clear();
2061 announce_counts.clear();
2062 if let Some(ref head) = branch::resolve_head(&shard_dir)?.1 {
2063 let (fc, ts) = commit_stats(&shard_dir, head).unwrap_or_default();
2064 let ann = shard_net::protocol::Announcement {
2065 commit_id: head.clone(),
2066 file_count: fc,
2067 total_size: ts,
2068 repo_name: repo_name.clone(),
2069 peer_multiaddr: format!("{}/p2p/{}", peer_multiaddr, node.local_peer_id()),
2070 };
2071 publish_ann(&mut node, &ann);
2072 info!("Re-announced commit {} on sync topic", head);
2073 }
2074 }
2075 event = node.swarm.select_next_some() => {
2076 match event {
2077 shard_net::libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
2078 info!("Listening on {address:?}");
2079 let _ = std::io::stdout().flush();
2080 }
2081 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2082 shard_net::p2p::ShardBehaviourEvent::Mdns(
2083 shard_net::libp2p::mdns::Event::Discovered(list),
2084 ),
2085 ) => {
2086 for (peer_id, multiaddr) in list {
2087 info!("mDNS discovered: {peer_id} {multiaddr}");
2088 address_book.entry(peer_id).or_default().push(multiaddr.clone());
2089 node.swarm
2090 .behaviour_mut()
2091 .gossipsub
2092 .add_explicit_peer(&peer_id);
2093 node.swarm
2094 .behaviour_mut()
2095 .kademlia
2096 .add_address(&peer_id, multiaddr);
2097 }
2098 }
2099 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2100 shard_net::p2p::ShardBehaviourEvent::Mdns(shard_net::libp2p::mdns::Event::Expired(
2101 list,
2102 )),
2103 ) => {
2104 for (peer_id, _multiaddr) in list {
2105 info!("mDNS expired: {peer_id}");
2106 node.swarm
2107 .behaviour_mut()
2108 .gossipsub
2109 .remove_explicit_peer(&peer_id);
2110 }
2111 }
2112 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2113 shard_net::p2p::ShardBehaviourEvent::Gossipsub(
2114 shard_net::libp2p::gossipsub::Event::Message {
2115 propagation_source,
2116 message,
2117 ..
2118 },
2119 ),
2120 ) => {
2121 if let Ok(ann) = serde_json::from_slice::<shard_net::protocol::Announcement>(&message.data) {
2123 let peer = propagation_source;
2124 let commit_id_owned = ann.commit_id;
2125 info!(
2126 "Peer {} announced commit: {} (repo: {}, {} files, {} bytes)",
2127 peer, commit_id_owned, ann.repo_name, ann.file_count, ann.total_size,
2128 );
2129 if !ann.peer_multiaddr.is_empty() && !address_book.contains_key(&peer) {
2131 if let Ok(addr) = ann.peer_multiaddr.parse::<shard_net::libp2p::Multiaddr>() {
2132 address_book.entry(peer).or_default().push(addr);
2133 }
2134 }
2135 let rate_key = (peer, commit_id_owned.clone());
2137 if announce_counts.get(&rate_key).copied().unwrap_or(0) > 5 {
2138 warn!("Rate limit exceeded for peer {} commit {}", peer, commit_id_owned);
2139 } else {
2140 *announce_counts.entry(rate_key).or_insert(0) += 1;
2141 let our_head = branch::resolve_head(&shard_dir)?.1.unwrap_or_default();
2143 if our_head != commit_id_owned {
2144 let (fc, ts) = commit_stats(&shard_dir, &our_head).unwrap_or_default();
2145 let reply = shard_net::protocol::Announcement {
2146 commit_id: our_head,
2147 file_count: fc,
2148 total_size: ts,
2149 repo_name: repo_name.clone(),
2150 peer_multiaddr: format!("{}/p2p/{}", peer_multiaddr, node.local_peer_id()),
2151 };
2152 publish_ann(&mut node, &reply);
2153 }
2154 if let Some(addrs) = address_book.get(&peer) {
2155 if let Some(addr) = addrs.first() {
2156 let multiaddr_str = format!("{}/p2p/{}", addr, peer);
2157 let path_clone = path_buf.clone();
2158 tokio::spawn(async move {
2159 match pull(&path_clone, &multiaddr_str, &commit_id_owned, false).await {
2160 Ok(_) => info!("Auto-pulled commit {} from {}", commit_id_owned, peer),
2161 Err(e) => error!("Auto-pull failed for commit {} from {}: {}", commit_id_owned, peer, e),
2162 }
2163 });
2164 }
2165 }
2166 }
2167 }
2168 }
2169 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2170 shard_net::p2p::ShardBehaviourEvent::RequestResponse(
2171 shard_net::libp2p::request_response::Event::Message { peer, message },
2172 ),
2173 ) => {
2174 if let shard_net::libp2p::request_response::Message::Request {
2175 request, channel, ..
2176 } = message
2177 {
2178 let req_count = request_counts.entry(peer).or_insert(0u32);
2180 *req_count += 1;
2181 if *req_count > 50 {
2182 warn!("Dropping request from {}: rate limit exceeded", peer);
2183 } else {
2185 info!("Received request from {}", peer);
2186 node.serve_request(&peer, &mut provider, request, channel);
2187 }
2188 } else {
2189 info!("Received Response from {}", peer);
2190 }
2191 }
2192 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2193 shard_net::p2p::ShardBehaviourEvent::RequestResponse(
2194 shard_net::libp2p::request_response::Event::OutboundFailure {
2195 peer, error, ..
2196 },
2197 ),
2198 ) => {
2199 error!("Outbound failure to {}: {:?}", peer, error);
2200 }
2201 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2202 shard_net::p2p::ShardBehaviourEvent::RequestResponse(
2203 shard_net::libp2p::request_response::Event::InboundFailure {
2204 peer, error, ..
2205 },
2206 ),
2207 ) => {
2208 error!("Inbound failure from {}: {:?}", peer, error);
2209 }
2210 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2211 shard_net::p2p::ShardBehaviourEvent::Identify(
2212 shard_net::libp2p::identify::Event::Received { peer_id, info },
2213 ),
2214 ) => {
2215 info!("Identify received from {}: {:?}", peer_id, info.listen_addrs);
2216 for addr in info.listen_addrs {
2217 address_book.entry(peer_id).or_default().push(addr);
2218 }
2219 let _ = std::io::stdout().flush();
2220 }
2221 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2222 shard_net::p2p::ShardBehaviourEvent::Identify(event),
2223 ) => {
2224 info!("Identify event: {:?}", event);
2225 }
2226 shard_net::libp2p::swarm::SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } => {
2227 info!("Connection established with {}", peer_id);
2228 if let shard_net::libp2p::core::ConnectedPoint::Dialer { address, .. } = &endpoint {
2231 address_book.entry(peer_id).or_default().push(address.clone());
2232 }
2233 node.swarm
2234 .behaviour_mut()
2235 .gossipsub
2236 .add_explicit_peer(&peer_id);
2237 if let Some(ref head) = branch::resolve_head(&shard_dir)?.1 {
2239 let (fc, ts) = commit_stats(&shard_dir, head).unwrap_or_default();
2240 let ann = shard_net::protocol::Announcement {
2241 commit_id: head.clone(),
2242 file_count: fc,
2243 total_size: ts,
2244 repo_name: repo_name.clone(),
2245 peer_multiaddr: format!("{}/p2p/{}", peer_multiaddr, node.local_peer_id()),
2246 };
2247 publish_ann(&mut node, &ann);
2248 }
2249 }
2250 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2251 shard_net::p2p::ShardBehaviourEvent::Relay(event),
2252 ) => {
2253 info!("Relay event: {:?}", event);
2254 }
2255 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2256 shard_net::p2p::ShardBehaviourEvent::Dcutr(event),
2257 ) => {
2258 info!("DCUtR event: {:?}", event);
2259 }
2260 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2261 shard_net::p2p::ShardBehaviourEvent::Autonat(event),
2262 ) => {
2263 info!("AutoNAT event: {:?}", event);
2264 }
2265 shard_net::libp2p::swarm::SwarmEvent::IncomingConnection {
2266 local_addr,
2267 send_back_addr,
2268 ..
2269 } => {
2270 info!(
2271 "Incoming connection from {} to {}",
2272 send_back_addr, local_addr
2273 );
2274 }
2275 e => {
2276 info!("Event: {:?}", e);
2277 }
2278 }
2279 }
2280 }
2281 }
2282}
2283
2284pub async fn pull(path: &Path, peer: &str, commit_id: &str, json: bool) -> Result<()> {
2285 let shard_dir = path.join(".shard");
2286 if !shard_dir.exists() {
2289 init(path, "flat", "zstd", "fixed", None, false, false)?;
2290 }
2291
2292 let store = Store::open(&shard_dir)?;
2293 let cipher = maybe_load_cipher(&shard_dir)?;
2294
2295 let mut node = shard_net::p2p::Node::new().await?;
2296
2297 let multiaddr: shard_net::libp2p::Multiaddr = peer.parse()?;
2299 let peer_id = match multiaddr.iter().last() {
2300 Some(shard_net::libp2p::multiaddr::Protocol::P2p(peer_id)) => peer_id,
2301 _ => anyhow::bail!("Multiaddr must end with /p2p/<peer_id>"),
2302 };
2303
2304 if !json {
2306 info!("Pulling commit {} from {}...", commit_id, peer);
2307 }
2308 let commit_data = node
2309 .request_manifest(&multiaddr, peer_id, commit_id.to_string())
2310 .await?;
2311 let hash = blake3::hash(&commit_data);
2312 if hash.to_hex().to_string() != commit_id {
2313 anyhow::bail!("Commit hash mismatch");
2314 }
2315 let chunk = crate::chunker::Chunk {
2316 hash,
2317 data: commit_data.clone(),
2318 offset: 0,
2319 };
2320 store.put_chunk(&chunk)?;
2321
2322 let commit: Commit = metadata::deserialize(&commit_data)?;
2323 if !json {
2324 info!("Got commit: {}", commit.message);
2325 }
2326
2327 let keys_dir = shard_dir.join("keys");
2329 if let Some(kid) = &commit.key_id {
2330 if keys_dir.join("records").exists() {
2331 if let Ok(chain) = keychain::collect_rotation_chain(&keys_dir, kid) {
2332 let missing_rotations: Vec<&KeyRotation> = chain
2333 .iter()
2334 .filter(|r| store.get_chunk(&r.rotation_id).is_err())
2335 .collect();
2336 if !missing_rotations.is_empty() {
2337 if !json {
2338 info!(
2339 "Fetching {} key rotation records from peer...",
2340 missing_rotations.len()
2341 );
2342 }
2343 let rot_requests: Vec<(String, shard_net::protocol::ShardRequest)> =
2344 missing_rotations
2345 .iter()
2346 .map(|r| {
2347 (
2348 r.rotation_id.clone(),
2349 shard_net::protocol::ShardRequest::GetChunk(
2350 r.rotation_id.clone(),
2351 ),
2352 )
2353 })
2354 .collect();
2355 if let Ok(rot_results) = node
2356 .request_parallel(&multiaddr, peer_id, rot_requests)
2357 .await
2358 {
2359 for (rot_id, rot_data) in &rot_results {
2360 let rh = blake3::hash(rot_data);
2361 if rh.to_hex().to_string() != *rot_id {
2362 info!("Key rotation record hash mismatch (expected {}, got {}) — skipping", rot_id, rh.to_hex());
2363 continue;
2364 }
2365 store.put_chunk(&crate::chunker::Chunk {
2366 hash: rh,
2367 data: rot_data.clone(),
2368 offset: 0,
2369 })?;
2370 }
2371 if !json {
2372 info!("Key rotation records synced from peer.");
2373 }
2374 }
2375 }
2376 }
2377 }
2378 }
2379
2380 if let Some(pk_hex) = &commit.public_key {
2382 let pk_bytes = hex::decode(pk_hex)?;
2383 let repo_id = blake3::hash(&pk_bytes).to_hex().to_string();
2384 let mut config = load_config(&shard_dir)?;
2385 config.insert("repo_id".to_string(), repo_id);
2386 save_config(&shard_dir, &config)?;
2387 }
2388
2389 let manifest_requests: Vec<(String, shard_net::protocol::ShardRequest)> = commit
2391 .manifests
2392 .iter()
2393 .map(|id| {
2394 (
2395 id.clone(),
2396 shard_net::protocol::ShardRequest::GetManifest(id.clone()),
2397 )
2398 })
2399 .collect();
2400 let manifest_results = node
2401 .request_parallel(&multiaddr, peer_id, manifest_requests)
2402 .await?;
2403
2404 let mut all_chunk_ids: Vec<String> = Vec::new();
2405 let mut file_manifests: Vec<FileManifest> = Vec::new();
2406 let mut chunk_compression: HashMap<String, String> = HashMap::new();
2408
2409 for (manifest_id, manifest_data) in &manifest_results {
2410 let hash = blake3::hash(manifest_data);
2411 if hash.to_hex().to_string() != *manifest_id {
2412 anyhow::bail!("Manifest hash mismatch: {}", manifest_id);
2413 }
2414 let chunk = crate::chunker::Chunk {
2415 hash,
2416 data: manifest_data.clone(),
2417 offset: 0,
2418 };
2419 store.put_chunk(&chunk)?;
2420 let manifest: FileManifest = metadata::deserialize(manifest_data)?;
2421 if !json {
2422 info!(
2423 "Fetching file: {} (compression: {})",
2424 manifest.name, manifest.compression
2425 );
2426 }
2427 for cid in &manifest.chunks {
2428 chunk_compression.insert(cid.clone(), manifest.compression.clone());
2429 }
2430 all_chunk_ids.extend(manifest.chunks.clone());
2431 file_manifests.push(manifest);
2432 }
2433
2434 let partial = partial::PartialTransfer::new(&shard_dir, commit_id)?;
2436 let partial_chunks = partial.list_chunks()?;
2437 let mut recovered = 0usize;
2438 for chunk_id in &partial_chunks {
2439 if let Ok(data) = partial.load_chunk(chunk_id) {
2440 let hash = blake3::hash(&data);
2441 if hash.to_hex().to_string() == *chunk_id {
2442 let chunk = crate::chunker::Chunk {
2444 hash,
2445 data: data.clone(),
2446 offset: 0,
2447 };
2448 if store.put_chunk(&chunk).is_ok() {
2449 recovered += 1;
2450 }
2451 } else {
2452 let _ = partial.remove_chunk(chunk_id);
2454 }
2455 }
2456 }
2457 if recovered > 0 {
2458 info!("Recovered {} chunks from partial transfer", recovered);
2459 }
2460
2461 let needed_chunks: Vec<String> = all_chunk_ids
2463 .into_iter()
2464 .filter(|id| store.get_chunk(id).is_err())
2465 .collect();
2466
2467 if !needed_chunks.is_empty() {
2468 if !json {
2469 info!("Fetching {} chunks...", needed_chunks.len());
2470 }
2471 let chunk_requests: Vec<(String, shard_net::protocol::ShardRequest)> = needed_chunks
2472 .iter()
2473 .map(|id| {
2474 (
2475 id.clone(),
2476 shard_net::protocol::ShardRequest::GetChunk(id.clone()),
2477 )
2478 })
2479 .collect();
2480 let chunk_results = node
2481 .request_parallel(&multiaddr, peer_id, chunk_requests)
2482 .await?;
2483 for (chunk_id, chunk_data) in &chunk_results {
2484 let compression: Compression = chunk_compression
2486 .get(chunk_id)
2487 .map(|s| s.as_str())
2488 .unwrap_or("none")
2489 .parse()?;
2490 let decrypted = match &cipher {
2492 Some(c) => c.decrypt(chunk_data)?,
2493 None => chunk_data.clone(),
2494 };
2495 let decompressed = compression.decompress(&decrypted)?;
2496 let hash = blake3::hash(&decompressed);
2497 if hash.to_hex().to_string() != *chunk_id {
2498 anyhow::bail!("Chunk hash mismatch: {}", chunk_id);
2499 }
2500 let chunk = crate::chunker::Chunk {
2502 hash,
2503 data: chunk_data.clone(),
2504 offset: 0,
2505 };
2506 store.put_chunk(&chunk)?;
2507 partial.save_chunk(chunk_id, chunk_data)?;
2509 }
2510 }
2511
2512 for manifest in &file_manifests {
2514 let compression = manifest.compression.parse::<Compression>()?;
2515 let mut file_data = Vec::new();
2516 for chunk_id in &manifest.chunks {
2517 let stored = store.get_chunk(chunk_id)?;
2518 let decrypted = match &cipher {
2519 Some(c) => c.decrypt(&stored)?,
2520 None => stored,
2521 };
2522 let decompressed = compression.decompress(&decrypted)?;
2523 file_data.extend_from_slice(&decompressed);
2524 }
2525 fs::write(path.join(&manifest.name), file_data)?;
2526 if !json {
2527 info!(
2528 "Reconstructed file: {} ({} bytes)",
2529 manifest.name, manifest.size
2530 );
2531 }
2532 }
2533
2534 partial.cleanup()?;
2536
2537 if json {
2538 info!(
2539 "{}",
2540 serde_json::to_string(&serde_json::json!({
2541 "status": "pull complete",
2542 "commit_id": commit_id,
2543 }))?
2544 );
2545 } else {
2546 info!("Pull complete.");
2547 }
2548 Ok(())
2549}
2550
2551pub fn transfer_list(path: &Path, json: bool) -> Result<()> {
2552 let shard_dir = path.join(".shard");
2553 if !shard_dir.exists() {
2554 anyhow::bail!("not a shard repository (run `shard init` first)");
2555 }
2556 let transfers = partial::list_incomplete_transfers(&shard_dir)?;
2557 if json {
2558 info!("{}", serde_json::to_string(&transfers)?);
2559 } else {
2560 if transfers.is_empty() {
2561 info!("No incomplete transfers.");
2562 } else {
2563 for t in &transfers {
2564 info!("Incomplete transfer: {}", t);
2565 }
2566 }
2567 }
2568 Ok(())
2569}
2570
2571pub fn transfer_remove(path: &Path, commit_id: &str) -> Result<()> {
2572 let shard_dir = path.join(".shard");
2573 if !shard_dir.exists() {
2574 anyhow::bail!("not a shard repository (run `shard init` first)");
2575 }
2576 partial::remove_transfer(&shard_dir, commit_id)?;
2577 info!("Removed transfer tracking for {}", commit_id);
2578 Ok(())
2579}
2580
2581pub async fn push(path: &Path, peer: &str, json: bool) -> Result<()> {
2582 let shard_dir = path.join(".shard");
2583 if !shard_dir.exists() {
2584 anyhow::bail!("not a shard repository (run `shard init` first)");
2585 }
2586
2587 let (_, head_id) = branch::resolve_head(&shard_dir)?;
2588 let head_id = head_id.ok_or_else(|| anyhow::anyhow!("No commits to push"))?;
2589
2590 let store = Store::open(&shard_dir)?;
2591
2592 let mut objects: std::collections::BTreeMap<String, Vec<u8>> =
2594 std::collections::BTreeMap::new();
2595
2596 let mut seen = std::collections::HashSet::new();
2598 let mut stack = vec![head_id.clone()];
2599 while let Some(cid) = stack.pop() {
2600 if !seen.insert(cid.clone()) {
2601 continue;
2602 }
2603 if let Ok(data) = store.get_chunk(&cid) {
2604 objects.insert(cid, data.clone());
2605 if let Ok(commit) = metadata::deserialize::<Commit>(&data) {
2606 if let Some(kid) = &commit.key_id {
2608 let keys_dir = shard_dir.join("keys");
2609 if let Ok(chain) = keychain::collect_rotation_chain(&keys_dir, kid) {
2610 for rot in &chain {
2611 let rj = serde_json::to_vec(rot)?;
2612 let rh = blake3::hash(&rj);
2613 if !store.has_chunk(rh.to_hex().as_ref()) {
2614 store.put_chunk(&crate::chunker::Chunk {
2615 hash: rh,
2616 data: rj.clone(),
2617 offset: 0,
2618 })?;
2619 }
2620 objects.insert(rot.rotation_id.clone(), rj);
2621 }
2622 }
2623 }
2624 for mid in &commit.manifests {
2625 if let Ok(manifest_data) = store.get_chunk(mid) {
2626 objects.insert(mid.clone(), manifest_data.clone());
2627 if let Ok(manifest) = metadata::deserialize::<FileManifest>(&manifest_data)
2628 {
2629 for cid in &manifest.chunks {
2630 if let Ok(chunk_data) = store.get_chunk(cid) {
2631 objects.insert(cid.clone(), chunk_data);
2632 }
2633 }
2634 }
2635 }
2636 }
2637 for parent in &commit.parents {
2638 stack.push(parent.clone());
2639 }
2640 }
2641 }
2642 }
2643
2644 if !json {
2645 info!(
2646 "Pushing {} objects ({} bytes)...",
2647 objects.len(),
2648 objects.values().map(|v| v.len() as u64).sum::<u64>()
2649 );
2650 }
2651
2652 let mut node = shard_net::p2p::Node::new().await?;
2654 let multiaddr: shard_net::libp2p::Multiaddr = peer.parse()?;
2655 let peer_id = match multiaddr.iter().last() {
2656 Some(shard_net::libp2p::multiaddr::Protocol::P2p(peer_id)) => peer_id,
2657 _ => anyhow::bail!("Multiaddr must end with /p2p/<peer_id>"),
2658 };
2659
2660 for (id, data) in &objects {
2661 node.request_put_chunk(&multiaddr, peer_id, id.clone(), data.clone())
2662 .await?;
2663 }
2664
2665 if json {
2666 info!(
2667 "{}",
2668 serde_json::to_string(&serde_json::json!({
2669 "status": "push complete",
2670 "objects": objects.len(),
2671 }))?
2672 );
2673 } else {
2674 info!("Push complete ({} objects).", objects.len());
2675 }
2676 Ok(())
2677}
2678
2679pub fn key_rotate(path: &Path) -> Result<()> {
2682 let shard_dir = path.join(".shard");
2683 if !shard_dir.exists() {
2684 anyhow::bail!("not a shard repository (run `shard init` first)");
2685 }
2686 let keys_dir = shard_dir.join("keys");
2687 let rotation = keychain::rotate_signing_key(&keys_dir)?;
2688
2689 if let Some(global) = global_keys_dir() {
2691 fs::create_dir_all(&global).ok();
2692 if let Ok(new_keys) = KeyPair::load(&keys_dir) {
2693 let _ = new_keys.save(&global);
2694 }
2695 }
2696
2697 let store = Store::open(&shard_dir)?;
2699 let json = serde_json::to_vec(&rotation)?;
2700 let hash = blake3::hash(&json);
2701 if !store.has_chunk(hash.to_hex().as_ref()) {
2702 store.put_chunk(&crate::chunker::Chunk {
2703 hash,
2704 data: json,
2705 offset: 0,
2706 })?;
2707 }
2708
2709 let rotations = keychain::load_rotations(&keys_dir)?;
2711 for rot in &rotations {
2712 let rj = serde_json::to_vec(rot)?;
2713 let rh = blake3::hash(&rj);
2714 if !store.has_chunk(rh.to_hex().as_ref()) {
2715 store.put_chunk(&crate::chunker::Chunk {
2716 hash: rh,
2717 data: rj,
2718 offset: 0,
2719 })?;
2720 }
2721 }
2722
2723 info!(
2724 "Key rotated: {} -> {}",
2725 rotation.old_key_id, rotation.new_key_id
2726 );
2727 info!("Rotation record: {} (stored in DAG)", rotation.rotation_id);
2728 Ok(())
2729}
2730
2731pub fn key_list(path: &Path, json: bool) -> Result<()> {
2733 let shard_dir = path.join(".shard");
2734 if !shard_dir.exists() {
2735 anyhow::bail!("not a shard repository (run `shard init` first)");
2736 }
2737 let keys_dir = shard_dir.join("keys");
2738 let records = keychain::load_records(&keys_dir)?;
2739 let current_id = keychain::get_current_key_id(&keys_dir)?;
2740
2741 if json {
2742 info!(
2743 "{}",
2744 serde_json::to_string_pretty(&serde_json::json!({
2745 "current": current_id,
2746 "records": &records,
2747 }))?
2748 );
2749 } else {
2750 info!("Current key: {}", current_id);
2751 info!("Key history:");
2752 for record in &records {
2753 let marker = if record.key_id == current_id {
2754 " (active)"
2755 } else {
2756 ""
2757 };
2758 info!(
2759 " {} created_at={}{}",
2760 record.key_id, record.created_at, marker
2761 );
2762 if let Some(prev) = &record.previous_key_id {
2763 info!(" previous: {}", prev);
2764 }
2765 }
2766 }
2767 Ok(())
2768}
2769
2770pub fn key_verify(path: &Path, json: bool) -> Result<()> {
2772 let shard_dir = path.join(".shard");
2773 if !shard_dir.exists() {
2774 anyhow::bail!("not a shard repository (run `shard init` first)");
2775 }
2776 let keys_dir = shard_dir.join("keys");
2777 let errors = keychain::verify_keychain(&keys_dir)?;
2778
2779 if json {
2780 info!(
2781 "{}",
2782 serde_json::to_string(&serde_json::json!({
2783 "verified": errors.is_empty(),
2784 "errors": errors,
2785 }))?
2786 );
2787 } else {
2788 if errors.is_empty() {
2789 info!("Keychain verification successful.");
2790 } else {
2791 for err in &errors {
2792 error!("Keychain error: {}", err);
2793 }
2794 anyhow::bail!("Keychain verification failed ({} errors).", errors.len());
2795 }
2796 }
2797 Ok(())
2798}