1pub mod api;
2pub mod branch;
3pub mod chunker;
4pub mod commit;
5pub mod compression;
6pub mod config;
7pub mod encryption;
8pub mod gc;
9pub mod index;
10pub mod keychain;
11pub mod manifest;
12pub mod metadata;
13pub mod metrics;
14pub mod ops;
15pub mod partial;
16pub mod store;
17pub mod wal;
18
19use crate::commit::Commit;
20use crate::compression::Compression;
21use crate::index::Index;
22use crate::keychain::KeyRotation;
23use crate::manifest::FileManifest;
24use crate::store::Store;
25use anyhow::Result;
26use ed25519_dalek::{Signer, Verifier};
27use metadata::MetadataFormat;
28use serde::Serialize;
29use shard_crypto::KeyPair;
30use shard_net::libp2p::futures::StreamExt;
31use similar::TextDiff;
32use std::collections::HashMap;
33use std::fs;
34use std::io::Write;
35use std::path::Path;
36use std::path::PathBuf;
37use std::sync::Mutex;
38use std::time::{Duration, SystemTime, UNIX_EPOCH};
39use tracing::{error, info, warn};
40
41static PASSPHRASE_CACHE: Mutex<Option<String>> = Mutex::new(None);
42
43static OP_QUEUES: once_cell::sync::Lazy<ops::RepoOpQueues> =
44 once_cell::sync::Lazy::new(ops::RepoOpQueues::new);
45
46#[allow(dead_code)]
47fn with_write_op<T>(path: &Path, desc: &str, f: impl FnOnce() -> Result<T>) -> Result<T> {
48 let trace_id = ops::generate_trace_id();
49 ops::set_trace_id(&trace_id);
50 let guard = OP_QUEUES
51 .get_or_create(&path.to_path_buf())
52 .acquire(ops::OpKind::Write, desc.to_string());
53 OP_QUEUES
54 .get_or_create(&path.to_path_buf())
55 .wait_for_turn(&guard);
56 let result = f();
57 if let Err(ref e) = result {
58 crate::metrics::METRICS
59 .errors_total
60 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
61 ops::traced_warn(format!("op {} failed: {}", desc, e));
62 }
63 drop(guard);
64 ops::set_trace_id("");
65 result
66}
67
68#[allow(dead_code)]
69fn with_read_op<T>(path: &Path, desc: &str, f: impl FnOnce() -> Result<T>) -> Result<T> {
70 let trace_id = ops::generate_trace_id();
71 ops::set_trace_id(&trace_id);
72 let guard = OP_QUEUES
73 .get_or_create(&path.to_path_buf())
74 .acquire(ops::OpKind::Read, desc.to_string());
75 OP_QUEUES
76 .get_or_create(&path.to_path_buf())
77 .wait_for_turn(&guard);
78 let result = f();
79 drop(guard);
80 ops::set_trace_id("");
81 result
82}
83
84pub fn cache_passphrase(passphrase: &str) -> Result<()> {
85 let mut cache = PASSPHRASE_CACHE.lock().unwrap();
86 *cache = Some(passphrase.to_string());
87 Ok(())
88}
89
90fn get_cached_passphrase() -> Option<String> {
91 PASSPHRASE_CACHE.lock().unwrap().clone()
92}
93
94fn load_shardignore(path: &Path) -> Vec<String> {
95 let ignore_path = path.join(".shardignore");
96 if ignore_path.exists() {
97 fs::read_to_string(&ignore_path)
98 .unwrap_or_default()
99 .lines()
100 .filter(|l| !l.trim().is_empty() && !l.trim().starts_with('#'))
101 .map(|l| l.trim().to_string())
102 .collect()
103 } else {
104 Vec::new()
105 }
106}
107
108fn matches_ignore(path: &Path, patterns: &[String]) -> bool {
109 let path_str = path.to_string_lossy();
110 for pattern in patterns {
111 let pattern = pattern.trim_end_matches('/');
112 if pattern == "*" {
113 return true;
114 }
115 if let Some(glob) = pattern.strip_prefix("**/") {
116 if path_str.contains(glob) {
117 return true;
118 }
119 } else if path_str == pattern || path_str.ends_with(&format!("/{}", pattern)) {
120 return true;
121 }
122 }
123 false
124}
125
126pub fn init(
127 path: &Path,
128 backend: &str,
129 compression_algo: &str,
130 chunker_mode: &str,
131 chunk_size: Option<u64>,
132 is_private: bool,
133 json: bool,
134) -> Result<()> {
135 init_with_passphrase(
136 path,
137 backend,
138 compression_algo,
139 chunker_mode,
140 chunk_size,
141 is_private,
142 json,
143 "",
144 )
145}
146
147#[allow(clippy::too_many_arguments)]
148pub fn init_with_passphrase(
149 path: &Path,
150 backend: &str,
151 compression_algo: &str,
152 chunker_mode: &str,
153 chunk_size: Option<u64>,
154 is_private: bool,
155 json: bool,
156 passphrase: &str,
157) -> Result<()> {
158 crate::metrics::METRICS
159 .ops_init
160 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
161 let _guard = OP_QUEUES
162 .get_or_create(&path.to_path_buf())
163 .acquire(ops::OpKind::Write, "init".to_string());
164 let shard_dir = path.join(".shard");
165 if shard_dir.exists() {
166 anyhow::bail!(
167 "repository already initialized at {} (run `shard status` to confirm)",
168 shard_dir.display()
169 );
170 }
171 fs::create_dir_all(shard_dir.join("objects"))?;
172 fs::create_dir_all(shard_dir.join("keys"))?;
173 fs::create_dir_all(shard_dir.join("refs").join("heads"))?;
174 branch::set_head_branch(&shard_dir, "main")?;
175
176 let keys = KeyPair::generate();
177 keys.save_with_passphrase(&shard_dir.join("keys"), passphrase)?;
178 if !passphrase.is_empty() {
179 cache_passphrase(passphrase)?;
180 }
181 if let Some(global) = global_keys_dir() {
182 fs::create_dir_all(&global).ok();
183 let _ = keys.save_with_passphrase(&global, passphrase);
184 let _ = keychain::init_keychain(&global);
185 }
186 keychain::init_keychain(&shard_dir.join("keys"))?;
187
188 let pubkey = fs::read(shard_dir.join("keys/public.key"))?;
191 let repo_id = blake3::hash(&pubkey).to_hex().to_string();
192 let mut config = load_config(&shard_dir)?;
193
194 if is_private {
195 let key = encryption::generate_repo_key();
196 encryption::save_repo_key(&shard_dir.join("keys"), &key)?;
197 config.insert("private".to_string(), "true".to_string());
198 }
199 config.insert("repo_id".to_string(), repo_id);
200 config.insert("storage_backend".to_string(), backend.to_string());
201 config.insert(
202 "serialization_format".to_string(),
203 MetadataFormat::Json.config_value().to_string(),
204 );
205 config.insert("compression".to_string(), compression_algo.to_string());
206 config.insert("chunker_mode".to_string(), chunker_mode.to_string());
207 match chunker_mode {
208 "rabin" => {
209 let chunk_size = chunk_size.unwrap_or(4_194_304);
210 let min = chunk_size / 4;
211 let max = chunk_size * 2;
212 config.insert("chunk_min".to_string(), min.to_string());
213 config.insert("chunk_avg".to_string(), chunk_size.to_string());
214 config.insert("chunk_max".to_string(), max.to_string());
215 }
216 _ => {
217 let cs = chunk_size.unwrap_or(4_194_304);
218 config.insert("chunk_size".to_string(), cs.to_string());
219 }
220 }
221 save_config(&shard_dir, &config)?;
222
223 let chunker_desc = if chunker_mode == "rabin" {
224 format!(
225 "rabin (avg {} bytes)",
226 config.get("chunk_avg").unwrap_or(&"4 MiB".to_string())
227 )
228 } else {
229 format!(
230 "fixed ({} bytes)",
231 config.get("chunk_size").unwrap_or(&"4 MiB".to_string())
232 )
233 };
234 if json {
235 info!(
236 "{}",
237 serde_json::to_string(&serde_json::json!({
238 "path": shard_dir.display().to_string(),
239 "backend": backend,
240 "compression": compression_algo,
241 "chunker": chunker_desc,
242 "private": is_private,
243 }))?
244 );
245 } else {
246 info!(
247 "Initialized empty Shard repository in {} with {} storage (compression: {}, chunking: {})",
248 shard_dir.display(),
249 backend,
250 compression_algo,
251 chunker_desc,
252 );
253 }
254 Ok(())
255}
256
257fn relative_path(repo_root: &Path, file_path: &Path) -> String {
258 let repo = repo_root
259 .canonicalize()
260 .unwrap_or_else(|_| repo_root.to_path_buf());
261 let file = file_path
262 .canonicalize()
263 .unwrap_or_else(|_| file_path.to_path_buf());
264 file.strip_prefix(&repo)
265 .map(|p| p.to_string_lossy().to_string())
266 .unwrap_or_else(|_| {
267 file_path
268 .file_name()
269 .map(|s| s.to_string_lossy().to_string())
270 .unwrap_or_default()
271 })
272}
273
274fn detect_content_type(file_path: &Path) -> Option<String> {
275 let ext = file_path.extension()?.to_str()?.to_lowercase();
276 let mime = match ext.as_str() {
277 "txt" => "text/plain",
278 "json" => "application/json",
279 "csv" => "text/csv",
280 "png" => "image/png",
281 "jpg" | "jpeg" => "image/jpeg",
282 "gif" => "image/gif",
283 "pdf" => "application/pdf",
284 "yaml" | "yml" => "application/x-yaml",
285 "md" => "text/markdown",
286 "html" | "htm" => "text/html",
287 "py" => "text/x-python",
288 "rs" => "text/x-rust",
289 "ts" => "text/x-typescript",
290 "js" => "application/javascript",
291 "wasm" => "application/wasm",
292 "toml" => "application/toml",
293 "xml" => "application/xml",
294 "zip" => "application/zip",
295 "tar" => "application/x-tar",
296 "gz" => "application/gzip",
297 "bin" => "application/octet-stream",
298 "pt" | "pth" | "ckpt" | "safetensors" => "application/x-model",
299 _ => return None,
300 };
301 Some(mime.to_string())
302}
303
304#[allow(clippy::too_many_arguments)]
305fn add_file(
306 repo_root: &Path,
307 file_path: &Path,
308 store: &Store,
309 index: &mut Index,
310 compression: &Compression,
311 chunker_mode: &chunker::ChunkerMode,
312 cipher: Option<&encryption::RepoCipher>,
313 _json: bool,
314) -> Result<()> {
315 let file = fs::File::open(file_path)?;
316 let mut chunker = match chunker_mode {
317 chunker::ChunkerMode::Fixed { chunk_size } => {
318 chunker::Chunker::new_fixed(Box::new(file), *chunk_size)
319 }
320 chunker::ChunkerMode::Rabin { min, avg, max } => {
321 chunker::Chunker::new_rabin(Box::new(file), *min, *avg, *max)
322 }
323 };
324 let mut chunk_hashes = Vec::new();
325 let mut total_size = 0;
326
327 while let Some(chunk) = chunker.next_chunk()? {
328 let hash = chunk.hash;
329 let compressed_data = compression.compress(&chunk.data)?;
330 let stored_data = match cipher {
331 Some(c) => c.encrypt(&compressed_data),
332 None => compressed_data,
333 };
334 let stored = crate::chunker::Chunk {
335 hash,
336 data: stored_data,
337 offset: chunk.offset,
338 };
339 store.put_chunk(&stored)?;
340 chunk_hashes.push(hash.to_hex().to_string());
341 total_size += chunk.data.len() as u64;
342 }
343
344 let name = relative_path(repo_root, file_path);
345 let manifest = FileManifest {
346 name: name.clone(),
347 size: total_size,
348 chunks: chunk_hashes.clone(),
349 content_type: detect_content_type(file_path),
350 compression: compression.as_str().to_string(),
351 merkle_root: Some(FileManifest::merkle_root(&chunk_hashes)),
352 created_by: None,
353 created_at: None,
354 signature: None,
355 };
356
357 index.files.insert(name.clone(), manifest);
358 if !_json {
359 info!("Added {} ({})", name, total_size);
360 }
361 Ok(())
362}
363
364pub fn recover(path: &Path, json: bool) -> Result<()> {
365 crate::metrics::METRICS
366 .ops_recover
367 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
368 let shard_dir = path.join(".shard");
369 if !shard_dir.exists() {
370 anyhow::bail!("not a shard repository (run `shard init` first)");
371 }
372 wal::recover(&shard_dir)?;
373 if json {
374 info!(
375 "{}",
376 serde_json::to_string(&serde_json::json!({"status": "recovery complete"}))?
377 );
378 } else {
379 info!("Recovery complete.");
380 }
381 Ok(())
382}
383
384pub fn health(path: &Path, json: bool) -> Result<()> {
385 use crate::metrics::METRICS;
386 METRICS
387 .ops_health
388 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
389
390 let shard_dir = path.join(".shard");
391 if !shard_dir.exists() {
392 anyhow::bail!("not a shard repository (run `shard init` first)");
393 }
394
395 let mut issues: Vec<String> = Vec::new();
396
397 let objects_ok = shard_dir.join("objects").exists();
398 if !objects_ok {
399 issues.push("objects directory missing".to_string());
400 }
401
402 let config_ok = shard_dir.join("config.json").exists();
403 let _config: Option<std::collections::BTreeMap<String, String>> = if config_ok {
404 match load_config(&shard_dir) {
405 Ok(c) => Some(c),
406 Err(e) => {
407 issues.push(format!("config.json corrupt: {}", e));
408 None
409 }
410 }
411 } else {
412 issues.push("config.json missing".to_string());
413 None
414 };
415
416 let keys_dir = shard_dir.join("keys");
417 let keys_ok = keys_dir.join("secret.key").exists() && keys_dir.join("public.key").exists();
418 if !keys_ok {
419 issues.push("keys missing (secret.key and/or public.key)".to_string());
420 }
421
422 let wal_has_entries = shard_dir.join("wal.log").exists() && {
423 std::fs::metadata(shard_dir.join("wal.log"))
424 .map(|m| m.len() > 0)
425 .unwrap_or(false)
426 };
427
428 let store_result = Store::open(&shard_dir);
429 let object_count = match &store_result {
430 Ok(store) => store.iter_chunks().map(|c| c.len() as u64).unwrap_or(0),
431 Err(e) => {
432 issues.push(format!("store error: {}", e));
433 0
434 }
435 };
436
437 let (branch, head) = branch::resolve_head(&shard_dir)
438 .ok()
439 .unwrap_or((None, None));
440
441 let branch_count = branch::list_branches(&shard_dir)
442 .map(|(_, b)| b.len() as u64)
443 .unwrap_or(0);
444
445 let tags_count = load_tags(&shard_dir).map(|t| t.len() as u64).unwrap_or(0);
446
447 let disk_usage = dir_size(&shard_dir);
448
449 let key_valid = load_keypair(&shard_dir).is_ok();
450
451 let metrics_snapshot = crate::metrics::METRICS.snapshot();
452
453 if json {
454 info!(
455 "{}",
456 serde_json::to_string(&serde_json::json!({
457 "status": if issues.is_empty() { "healthy" } else { "degraded" },
458 "shard_dir": shard_dir.display().to_string(),
459 "objects_ok": objects_ok,
460 "config_ok": config_ok,
461 "keys_ok": keys_ok && key_valid,
462 "wal_pending": wal_has_entries,
463 "object_count": object_count,
464 "current_branch": branch,
465 "head_commit": head,
466 "branch_count": branch_count,
467 "tag_count": tags_count,
468 "disk_usage_bytes": disk_usage,
469 "issues": issues,
470 "metrics": metrics_snapshot,
471 }))?
472 );
473 } else {
474 if issues.is_empty() {
475 info!("Health: OK");
476 } else {
477 info!("Health: DEGRADED");
478 for issue in &issues {
479 warn!(" - {}", issue);
480 }
481 }
482 info!(" Objects: {}", object_count);
483 info!(" Branches: {}, Tags: {}", branch_count, tags_count);
484 info!(" Current branch: {:?}", branch);
485 info!(" HEAD: {:?}", head);
486 info!(" WAL pending: {}", wal_has_entries);
487 info!(" Disk usage: {} bytes", disk_usage);
488 }
489
490 if !issues.is_empty() {
491 anyhow::bail!("health check found {} issue(s)", issues.len());
492 }
493 Ok(())
494}
495
496fn dir_size(path: &Path) -> u64 {
497 walkdir::WalkDir::new(path)
498 .into_iter()
499 .filter_map(|e| e.ok())
500 .filter(|e| e.file_type().is_file())
501 .filter_map(|e| e.metadata().ok())
502 .map(|m| m.len())
503 .sum()
504}
505
506pub fn get_metrics() -> metrics::MetricsSnapshot {
507 crate::metrics::METRICS.snapshot()
508}
509
510pub fn add(path: &Path, file_path: &Path, json: bool) -> Result<()> {
511 crate::metrics::METRICS
512 .ops_add
513 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
514 let _guard = OP_QUEUES
515 .get_or_create(&path.to_path_buf())
516 .acquire(ops::OpKind::Write, "add".to_string());
517 let shard_dir = path.join(".shard");
518 if !shard_dir.exists() {
519 anyhow::bail!("not a shard repository (run `shard init` first)");
520 }
521
522 wal::recover(&shard_dir)?;
523
524 let config = load_config(&shard_dir)?;
525 let compression: Compression = config
526 .get("compression")
527 .map(|s| s.as_str())
528 .unwrap_or("zstd")
529 .parse()?;
530
531 let chunker_mode = chunker::ChunkerMode::from_config(&config);
532 let fmt = MetadataFormat::from_config(&config);
533
534 let store = Store::open(&shard_dir)?;
535 let mut index = Index::load(&shard_dir.join("index"), &fmt)?;
536
537 let cipher = maybe_load_cipher(&shard_dir)?;
538 let ignore_patterns = load_shardignore(path);
539
540 if file_path.is_dir() {
541 for entry in walkdir::WalkDir::new(file_path)
542 .into_iter()
543 .filter_entry(|e| {
544 let name = e.file_name().to_string_lossy();
545 if name == ".shard" || name == ".git" {
546 return false;
547 }
548 let rel = e.path().strip_prefix(file_path).unwrap_or(e.path());
549 if matches_ignore(rel, &ignore_patterns) {
550 return false;
551 }
552 true
553 })
554 {
555 let entry = entry?;
556 if entry.file_type().is_file() {
557 add_file(
558 path,
559 entry.path(),
560 &store,
561 &mut index,
562 &compression,
563 &chunker_mode,
564 cipher.as_ref(),
565 json,
566 )?;
567 }
568 }
569 } else {
570 add_file(
571 path,
572 file_path,
573 &store,
574 &mut index,
575 &compression,
576 &chunker_mode,
577 cipher.as_ref(),
578 json,
579 )?;
580 }
581
582 index.save(&shard_dir.join("index"), &fmt)?;
583 if json {
584 info!(
585 "{}",
586 serde_json::to_string(&serde_json::json!({"status": "added"}))?
587 );
588 }
589 Ok(())
590}
591
592pub fn commit(path: &Path, message: &str, author: &str, json: bool) -> Result<()> {
593 let _guard = OP_QUEUES
594 .get_or_create(&path.to_path_buf())
595 .acquire(ops::OpKind::Write, "commit".to_string());
596 let shard_dir = path.join(".shard");
597 if !shard_dir.exists() {
598 anyhow::bail!("not a shard repository (run `shard init` first)");
599 }
600
601 wal::recover(&shard_dir)?;
603
604 let config = load_config(&shard_dir)?;
605 let fmt = MetadataFormat::from_config(&config);
606
607 let store = Store::open(&shard_dir)?;
608 let mut index = Index::load(&shard_dir.join("index"), &fmt)?;
609
610 if index.files.is_empty() {
611 anyhow::bail!("nothing to commit (stage files with `shard add` first)");
612 }
613
614 let head_path = shard_dir.join("HEAD");
615
616 let wal = wal::Wal::new(&shard_dir);
618 let head_backup = fs::read_to_string(&head_path).ok();
619 let index_backup = fs::read(shard_dir.join("index"))?;
620 wal.append(&wal::WalEntry::CommitBegin {
621 head_backup,
622 index_backup,
623 })?;
624
625 let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
627 let keys = load_keypair(&shard_dir)?;
628 let signing_key = keys.signing_key;
629 let mut manifest_ids = Vec::new();
630 for manifest in index.files.values_mut() {
631 manifest.created_by = Some(author.to_string());
632 manifest.created_at = Some(timestamp);
633
634 let mut unsigned = manifest.clone();
635 unsigned.signature = None;
636 let canonical = metadata::serialize_for_signing(&unsigned);
637 let sig = signing_key.sign(&canonical);
638 manifest.signature = Some(hex::encode(sig.to_bytes()));
639
640 let encoded = metadata::serialize(manifest, &fmt);
641 let hash = blake3::hash(&encoded);
642 let chunk = crate::chunker::Chunk {
643 hash,
644 data: encoded,
645 offset: 0,
646 };
647 store.put_chunk(&chunk)?;
648 manifest_ids.push(hash.to_hex().to_string());
649 }
650 manifest_ids.sort();
651
652 let mut parents = Vec::new();
654 let (current_branch, parent_id) = branch::resolve_head(&shard_dir)?;
655 if let Some(pid) = parent_id {
656 parents.push(pid);
657 }
658
659 let public_key_hex = hex::encode(keys.verifying_key.to_bytes());
661 let key_id = keychain::get_current_key_id(&shard_dir.join("keys")).ok();
662 let mut commit = Commit {
663 commit_id: String::new(),
664 parents,
665 manifests: manifest_ids,
666 author: author.to_string(),
667 message: message.to_string(),
668 timestamp,
669 public_key: Some(public_key_hex),
670 signature: None,
671 key_id,
672 };
673
674 let json_unsigned = metadata::serialize_for_signing(&commit);
676 let signature = signing_key.sign(&json_unsigned);
677 commit.signature = Some(hex::encode(signature.to_bytes()));
678
679 let encoded = metadata::serialize(&commit, &fmt);
681 let hash = blake3::hash(&encoded);
682 let chunk = crate::chunker::Chunk {
683 hash,
684 data: encoded,
685 offset: 0,
686 };
687 store.put_chunk(&chunk)?;
688
689 let commit_id = hash.to_hex().to_string();
691 if has_dag_cycle(&store, &commit.parents, &commit_id)? {
692 anyhow::bail!(
693 "Cycle detected: commit {} is already an ancestor of one or more parents",
694 commit_id
695 );
696 }
697
698 if let Some(ref branch_name) = current_branch {
700 branch::update_branch_ref(&shard_dir, branch_name, &commit_id)?;
701 branch::set_head_branch(&shard_dir, branch_name)?;
702 } else {
703 fs::write(&head_path, &commit_id)?;
704 }
705
706 index.files.clear();
708 index.save(&shard_dir.join("index"), &fmt)?;
709
710 wal.append(&wal::WalEntry::CommitEnd)?;
712 wal.truncate()?;
713
714 if json {
715 info!(
716 "{}",
717 serde_json::to_string(&serde_json::json!({
718 "commit_id": commit_id,
719 "message": message,
720 }))?
721 );
722 } else {
723 info!("Committed {} ({})", commit_id, message);
724 }
725 Ok(())
726}
727
728pub fn verify(path: &Path, commit_id: &str, json: bool) -> Result<()> {
729 let _guard = OP_QUEUES
730 .get_or_create(&path.to_path_buf())
731 .acquire(ops::OpKind::Read, "verify".to_string());
732 let shard_dir = path.join(".shard");
733 if !shard_dir.exists() {
734 anyhow::bail!("not a shard repository (run `shard init` first)");
735 }
736
737 if commit_id.len() < 2 {
738 anyhow::bail!("invalid commit id (too short: need at least 2 characters)");
739 }
740 let store = Store::open(&shard_dir)?;
741 let cipher = maybe_load_cipher(&shard_dir)?;
742 let commit_data = store.get_chunk(commit_id)?;
743
744 let stored_hash = blake3::hash(&commit_data);
746 if stored_hash.to_hex().to_string() != commit_id {
747 anyhow::bail!("commit object hash mismatch: stored content does not match its hash — data may be corrupted");
748 }
749
750 let commit: Commit = metadata::deserialize(&commit_data)?;
751
752 let mut sig_verified = false;
753 let mut files_checked = 0u64;
754
755 if let Some(kid) = &commit.key_id {
757 if let Err(e) = keychain::key_was_valid_at(&shard_dir.join("keys"), kid, commit.timestamp) {
758 anyhow::bail!("Keychain verification failed: {}", e);
759 } else if !json {
760 info!("Keychain: key {} was active at commit time.", kid);
761 }
762 }
763
764 if let Some(sig_hex) = &commit.signature {
765 let verifying_key = if let Some(pk_hex) = &commit.public_key {
766 let pk_bytes = hex::decode(pk_hex)?;
767 ed25519_dalek::VerifyingKey::from_bytes(pk_bytes.as_slice().try_into()?)?
768 } else {
769 let pub_bytes = load_public_key(&shard_dir)?;
770 ed25519_dalek::VerifyingKey::from_bytes(pub_bytes.as_slice().try_into()?)?
771 };
772
773 let mut unsigned_commit = commit.clone();
774 unsigned_commit.signature = None;
775 let json_unsigned = metadata::serialize_for_signing(&unsigned_commit);
776
777 let sig_bytes = hex::decode(sig_hex)?;
778 let signature = ed25519_dalek::Signature::from_bytes(sig_bytes.as_slice().try_into()?);
779
780 verifying_key.verify(&json_unsigned, &signature)?;
781 sig_verified = true;
782 if !json {
783 info!("Signature verified.");
784 }
785 } else if !json {
786 info!("Warning: Commit is unsigned.");
787 }
788
789 for manifest_id in &commit.manifests {
790 let manifest_data = store.get_chunk(manifest_id)?;
791 let hash = blake3::hash(&manifest_data);
792 if hash.to_hex().to_string() != *manifest_id {
793 anyhow::bail!("manifest object hash mismatch for manifest '{}': content does not match stored hash. The object store may be corrupted.", manifest_id);
794 }
795
796 let manifest: FileManifest = metadata::deserialize(&manifest_data)?;
797
798 if let Some(sig_hex) = &manifest.signature {
800 let pk_bytes = if let Some(pk_hex) = &commit.public_key {
801 hex::decode(pk_hex)?
802 } else {
803 load_public_key(&shard_dir)?
804 };
805 let vk = ed25519_dalek::VerifyingKey::from_bytes(pk_bytes.as_slice().try_into()?)?;
806 let mut unsigned = manifest.clone();
807 unsigned.signature = None;
808 let canonical = metadata::serialize_for_signing(&unsigned);
809 let sig_bytes = hex::decode(sig_hex)?;
810 let sig = ed25519_dalek::Signature::from_bytes(sig_bytes.as_slice().try_into()?);
811 vk.verify(&canonical, &sig)?;
812 if !json {
813 info!(" Manifest signature verified for: {}", manifest.name);
814 }
815 }
816
817 let compression = manifest.compression.parse::<Compression>()?;
818 if !json {
819 info!(
820 "Verifying file: {} (compression: {})",
821 manifest.name, manifest.compression
822 );
823 }
824
825 if let Some(ref mr) = manifest.merkle_root {
827 let computed = FileManifest::merkle_root(&manifest.chunks);
828 if mr != &computed {
829 anyhow::bail!(
830 "merkle root mismatch for '{}': manifest says {} but computed {}",
831 manifest.name,
832 mr,
833 computed
834 );
835 }
836 }
837
838 for chunk_id in &manifest.chunks {
839 let chunk_data = store.get_chunk(chunk_id)?;
840 let decrypted = match &cipher {
841 Some(c) => c.decrypt(&chunk_data)?,
842 None => chunk_data,
843 };
844 let decompressed = compression.decompress(&decrypted)?;
845 let hash = blake3::hash(&decompressed);
846 if hash.to_hex().to_string() != *chunk_id {
847 anyhow::bail!("chunk hash mismatch for '{}': content does not match stored hash (expected {}, got {}). File may be corrupted.", manifest.name, chunk_id, hash.to_hex());
848 }
849 }
850 files_checked += 1;
851 }
852
853 if json {
854 info!(
855 "{}",
856 serde_json::to_string(&serde_json::json!({
857 "commit_id": commit_id,
858 "verified": true,
859 "signature_verified": sig_verified,
860 "files_checked": files_checked,
861 }))?
862 );
863 } else {
864 info!("Verification successful.");
865 }
866 Ok(())
867}
868
869fn load_commit(store: &Store, commit_id: &str) -> Result<Commit> {
870 if commit_id.len() < 2 {
871 anyhow::bail!(
872 "commit id too short (got {} chars, need at least 2): '{}'",
873 commit_id.len(),
874 commit_id
875 );
876 };
877 let data = store.get_chunk(commit_id)?;
878 let mut commit: Commit = metadata::deserialize(&data)?;
879 commit.commit_id = commit_id.to_string();
880 Ok(commit)
881}
882
883fn has_dag_cycle(store: &Store, parents: &[String], commit_id: &str) -> Result<bool> {
884 let mut seen = std::collections::HashSet::new();
885 let mut stack: Vec<String> = parents.to_vec();
886 while let Some(cid) = stack.pop() {
887 if cid == commit_id {
888 return Ok(true);
889 }
890 if !seen.insert(cid.clone()) {
891 continue;
892 }
893 if let Ok(data) = store.get_chunk(&cid) {
894 if let Ok(commit) = metadata::deserialize::<Commit>(&data) {
895 for p in &commit.parents {
896 stack.push(p.clone());
897 }
898 }
899 }
900 }
901 Ok(false)
902}
903
904#[derive(Serialize)]
905pub struct LogEntry {
906 pub commit_id: String,
907 pub parents: Vec<String>,
908 pub manifests: Vec<String>,
909 pub author: String,
910 pub message: String,
911 pub timestamp: u64,
912 pub signature: Option<String>,
913}
914
915impl From<Commit> for LogEntry {
916 fn from(c: Commit) -> Self {
917 LogEntry {
918 commit_id: c.commit_id,
919 parents: c.parents,
920 manifests: c.manifests,
921 author: c.author,
922 message: c.message,
923 timestamp: c.timestamp,
924 signature: c.signature,
925 }
926 }
927}
928
929pub fn log_cmd(path: &Path, json: bool) -> Result<()> {
930 let shard_dir = path.join(".shard");
931 if !shard_dir.exists() {
932 anyhow::bail!("not a shard repository (run `shard init` first)");
933 }
934
935 let store = Store::open(&shard_dir)?;
936
937 let (_, head_commit) = branch::resolve_head(&shard_dir)?;
938 let head = head_commit
939 .ok_or_else(|| anyhow::anyhow!("no commits yet (run `shard commit` after adding files)"))?;
940
941 let mut entries: Vec<LogEntry> = Vec::new();
942 let mut seen = std::collections::HashSet::new();
943 let mut stack = vec![head];
944
945 while let Some(cid) = stack.pop() {
946 if !seen.insert(cid.clone()) {
947 continue;
948 }
949 let commit = load_commit(&store, &cid)?;
950 for parent in &commit.parents {
951 stack.push(parent.clone());
952 }
953 entries.push(commit.into());
954 }
955
956 if json {
957 info!("{}", serde_json::to_string_pretty(&entries)?);
958 } else {
959 for entry in &entries {
960 let ts = {
961 let secs = entry.timestamp as i64;
962 let tm = time::OffsetDateTime::from_unix_timestamp(secs)
963 .unwrap_or(time::OffsetDateTime::UNIX_EPOCH);
964 tm.format(&time::format_description::well_known::Rfc3339)
965 .unwrap_or_else(|_| entry.timestamp.to_string())
966 };
967 info!("commit {}", entry.commit_id);
968 if !entry.parents.is_empty() {
969 info!("parents: {}", entry.parents.join(" "));
970 }
971 info!("author: {}", entry.author);
972 info!("date: {}", ts);
973 info!("");
974 for line in entry.message.lines() {
975 info!(" {}", line);
976 }
977 info!("");
978 }
979 }
980
981 Ok(())
982}
983
984fn reconstruct_file(
985 store: &Store,
986 manifest: &FileManifest,
987 cipher: Option<&encryption::RepoCipher>,
988) -> Result<Vec<u8>> {
989 let compression: Compression = manifest.compression.parse()?;
990 let mut data = Vec::new();
991 for chunk_id in &manifest.chunks {
992 let chunk_data = store.get_chunk(chunk_id)?;
993 let decrypted = match cipher {
994 Some(c) => c.decrypt(&chunk_data)?,
995 None => chunk_data,
996 };
997 let decompressed = compression.decompress(&decrypted)?;
998 data.extend_from_slice(&decompressed);
999 }
1000 Ok(data)
1001}
1002
1003pub fn diff(path: &Path, commit_a: &str, commit_b: Option<&str>, json: bool) -> Result<()> {
1004 let shard_dir = path.join(".shard");
1005 if !shard_dir.exists() {
1006 anyhow::bail!("not a shard repository (run `shard init` first)");
1007 }
1008
1009 let store = Store::open(&shard_dir)?;
1010 let cipher = maybe_load_cipher(&shard_dir)?;
1011
1012 let cid_b = match commit_b {
1013 Some(c) => branch::resolve_rev(&shard_dir, c)?,
1014 None => {
1015 let (_, head) = branch::resolve_head(&shard_dir)?;
1016 head.ok_or_else(|| anyhow::anyhow!("no commits yet"))?
1017 }
1018 };
1019 let cid_a = branch::resolve_rev(&shard_dir, commit_a)?;
1020
1021 let commit1 = load_commit(&store, &cid_a)?;
1022 let commit2 = load_commit(&store, &cid_b)?;
1023
1024 let mut files1: HashMap<String, FileManifest> = HashMap::new();
1025 for mid in &commit1.manifests {
1026 let data = store.get_chunk(mid)?;
1027 let m: FileManifest = metadata::deserialize(&data)?;
1028 files1.insert(m.name.clone(), m);
1029 }
1030
1031 let mut files2: HashMap<String, FileManifest> = HashMap::new();
1032 for mid in &commit2.manifests {
1033 let data = store.get_chunk(mid)?;
1034 let m: FileManifest = metadata::deserialize(&data)?;
1035 files2.insert(m.name.clone(), m);
1036 }
1037
1038 let mut all_names: Vec<&String> = files1.keys().chain(files2.keys()).collect();
1039 all_names.sort();
1040 all_names.dedup();
1041
1042 let mut changes: Vec<serde_json::Value> = Vec::new();
1043 let mut diff_found = false;
1044
1045 for name in all_names {
1046 match (files1.get(name), files2.get(name)) {
1047 (None, Some(manifest)) => {
1048 let content = reconstruct_file(&store, manifest, cipher.as_ref())?;
1049 let text = String::from_utf8_lossy(&content);
1050 diff_found = true;
1051 if json {
1052 changes.push(serde_json::json!({
1053 "type": "added",
1054 "file": name,
1055 "lines": text.lines().collect::<Vec<_>>(),
1056 }));
1057 } else {
1058 info!("--- /dev/null");
1059 info!("+++ b/{}", name);
1060 let lines: Vec<&str> = text.lines().collect();
1061 info!("@@ -0,0 +1,{} @@", lines.len());
1062 for line in &lines {
1063 info!("+{}", line);
1064 }
1065 }
1066 }
1067 (Some(manifest), None) => {
1068 let content = reconstruct_file(&store, manifest, cipher.as_ref())?;
1069 let text = String::from_utf8_lossy(&content);
1070 diff_found = true;
1071 if json {
1072 changes.push(serde_json::json!({
1073 "type": "removed",
1074 "file": name,
1075 "lines": text.lines().collect::<Vec<_>>(),
1076 }));
1077 } else {
1078 info!("--- a/{}", name);
1079 info!("+++ /dev/null");
1080 let lines: Vec<&str> = text.lines().collect();
1081 info!("@@ -1,{} +0,0 @@", lines.len());
1082 for line in &lines {
1083 info!("-{}", line);
1084 }
1085 }
1086 }
1087 (Some(ma), Some(mb)) => {
1088 if ma.chunks == mb.chunks {
1089 continue;
1090 }
1091 let content_a = reconstruct_file(&store, ma, cipher.as_ref())?;
1092 let content_b = reconstruct_file(&store, mb, cipher.as_ref())?;
1093 diff_found = true;
1094 if json {
1095 let text_a = String::from_utf8_lossy(&content_a);
1096 let text_b = String::from_utf8_lossy(&content_b);
1097 changes.push(serde_json::json!({
1098 "type": "modified",
1099 "file": name,
1100 "old_lines": text_a.lines().collect::<Vec<_>>(),
1101 "new_lines": text_b.lines().collect::<Vec<_>>(),
1102 }));
1103 } else {
1104 let text_a = String::from_utf8_lossy(&content_a);
1105 let text_b = String::from_utf8_lossy(&content_b);
1106 let diff = TextDiff::from_lines(text_a.as_ref(), text_b.as_ref());
1107 let mut buf: Vec<u8> = Vec::new();
1108 diff.unified_diff()
1109 .header(&format!("a/{}", name), &format!("b/{}", name))
1110 .to_writer(&mut buf)
1111 .map_err(|e| anyhow::anyhow!("diff output error: {}", e))?;
1112 let output = String::from_utf8_lossy(&buf);
1113 for line in output.lines() {
1114 info!("{}", line);
1115 }
1116 }
1117 }
1118 (None, None) => {}
1119 }
1120 }
1121
1122 if !diff_found {
1123 if json {
1124 info!(
1125 "{}",
1126 serde_json::to_string(
1127 &serde_json::json!({"changes": changes, "message": "no differences"})
1128 )?
1129 );
1130 } else {
1131 info!("No differences between the commits.");
1132 }
1133 return Ok(());
1134 }
1135
1136 if json {
1137 info!(
1138 "{}",
1139 serde_json::to_string(&serde_json::json!({"changes": changes}))?
1140 );
1141 }
1142
1143 Ok(())
1144}
1145
1146pub fn checkout(path: &Path, target: &str, json: bool) -> Result<()> {
1147 let _guard = OP_QUEUES
1148 .get_or_create(&path.to_path_buf())
1149 .acquire(ops::OpKind::Write, "checkout".to_string());
1150 let shard_dir = path.join(".shard");
1151 if !shard_dir.exists() {
1152 anyhow::bail!("not a shard repository (run `shard init` first)");
1153 }
1154
1155 let store = Store::open(&shard_dir)?;
1156 let cipher = maybe_load_cipher(&shard_dir)?;
1157
1158 let branch_path = shard_dir.join("refs").join("heads").join(target);
1161 let commit_id = if branch_path.exists() {
1162 fs::read_to_string(&branch_path)?.trim().to_string()
1163 } else {
1164 target.to_string()
1165 };
1166
1167 let commit = load_commit(&store, &commit_id)?;
1169
1170 if branch_path.exists() {
1172 branch::set_head_branch(&shard_dir, target)?;
1173 } else {
1174 branch::set_head_commit(&shard_dir, target)?;
1175 }
1176 let mut files = Vec::new();
1177
1178 for manifest_id in &commit.manifests {
1179 let data = store.get_chunk(manifest_id)?;
1180 let hash = blake3::hash(&data);
1181 if hash.to_hex().to_string() != *manifest_id {
1182 anyhow::bail!("Manifest hash mismatch: {}", manifest_id);
1183 }
1184 let manifest: FileManifest = metadata::deserialize(&data)?;
1185 let compression = manifest.compression.parse::<Compression>()?;
1186 if !json {
1187 info!(
1188 "Checking out file: {} (compression: {})",
1189 manifest.name, manifest.compression
1190 );
1191 }
1192
1193 let mut file_data = Vec::new();
1194 for chunk_id in &manifest.chunks {
1195 let chunk_data = store.get_chunk(chunk_id)?;
1196 let decrypted = match &cipher {
1197 Some(c) => c.decrypt(&chunk_data)?,
1198 None => chunk_data,
1199 };
1200 let decompressed = compression.decompress(&decrypted)?;
1201 file_data.extend_from_slice(&decompressed);
1202 }
1203 fs::write(path.join(&manifest.name), file_data)?;
1204 if !json {
1205 info!(" -> {}", manifest.name);
1206 }
1207 files.push(manifest.name);
1208 }
1209
1210 if json {
1211 info!(
1212 "{}",
1213 serde_json::to_string(&serde_json::json!({
1214 "commit_id": commit_id,
1215 "files": files,
1216 }))?
1217 );
1218 } else {
1219 info!("Checkout complete.");
1220 }
1221 Ok(())
1222}
1223
1224pub fn status(path: &Path, json: bool) -> Result<()> {
1225 let shard_dir = path.join(".shard");
1226 if !shard_dir.exists() {
1227 anyhow::bail!("not a shard repository (run `shard init` first)");
1228 }
1229
1230 let config = load_config(&shard_dir)?;
1231 let fmt = MetadataFormat::from_config(&config);
1232
1233 let (current_branch, head_commit) = branch::resolve_head(&shard_dir)?;
1234 let mut commit_id: Option<String> = None;
1235 if let Some(cid) = head_commit {
1236 commit_id = Some(cid);
1237 if !json {
1238 if let Some(ref branch) = current_branch {
1239 info!("On branch: {}", branch);
1240 } else {
1241 info!("HEAD detached at {}", commit_id.as_ref().unwrap());
1242 }
1243 }
1244 } else if !json {
1245 info!("No commits yet.");
1246 }
1247
1248 let index = Index::load(&shard_dir.join("index"), &fmt)?;
1249 let staged: Vec<String> = index.files.keys().cloned().collect();
1250 if !json {
1251 if staged.is_empty() {
1252 info!("Nothing staged.");
1253 } else {
1254 info!("\nStaged files:");
1255 for name in &staged {
1256 info!(" {} (to be committed)", name);
1257 }
1258 }
1259 }
1260
1261 let store = Store::open(&shard_dir)?;
1262 let mut deleted = Vec::new();
1263 let tracked_names: std::collections::HashSet<String> = if let Some(head) = &commit_id {
1264 let mut names = std::collections::HashSet::new();
1265 if let Ok(commit) = load_commit(&store, head) {
1266 for manifest_id in &commit.manifests {
1267 if let Ok(data) = store.get_chunk(manifest_id) {
1268 if let Ok(manifest) = metadata::deserialize::<FileManifest>(&data) {
1269 let file_path = path.join(&manifest.name);
1270 if !file_path.exists() {
1271 deleted.push(manifest.name.clone());
1272 }
1273 names.insert(manifest.name);
1274 }
1275 }
1276 }
1277 }
1278 names
1279 } else {
1280 std::collections::HashSet::new()
1281 };
1282
1283 if !json && !deleted.is_empty() {
1284 info!("\nDeleted files:");
1285 for name in &deleted {
1286 info!(" {} (deleted)", name);
1287 }
1288 }
1289
1290 let mut untracked = Vec::new();
1291 let ignore_patterns = load_shardignore(path);
1292 for entry in walkdir::WalkDir::new(path)
1293 .min_depth(1)
1294 .into_iter()
1295 .filter_map(|e| e.ok())
1296 {
1297 if entry.file_type().is_file() || entry.file_type().is_dir() {
1298 let rel_path = entry.path().strip_prefix(path).unwrap_or(entry.path());
1299 let name = rel_path.to_string_lossy().to_string();
1300 let is_hidden = rel_path.components().any(|c| {
1301 let name = c.as_os_str().to_string_lossy();
1302 name == ".shard" || name == ".git"
1303 });
1304 if !is_hidden
1305 && entry.path() != shard_dir
1306 && !entry.path().starts_with(&shard_dir)
1307 && !index.files.contains_key(&name)
1308 && !tracked_names.contains(&name)
1309 && entry.file_type().is_file()
1310 && !matches_ignore(rel_path, &ignore_patterns)
1311 {
1312 untracked.push(name);
1313 }
1314 }
1315 }
1316 if !json && !untracked.is_empty() {
1317 info!("\nUntracked files:");
1318 for name in &untracked {
1319 info!(" {}", name);
1320 }
1321 }
1322
1323 if json {
1324 info!(
1325 "{}",
1326 serde_json::to_string(&serde_json::json!({
1327 "commit": commit_id,
1328 "staged": staged,
1329 "deleted": deleted,
1330 "untracked": untracked,
1331 }))?
1332 );
1333 }
1334
1335 Ok(())
1336}
1337
1338fn load_config(shard_dir: &Path) -> Result<std::collections::BTreeMap<String, String>> {
1339 let config_path = shard_dir.join("config.json");
1340 let mut config: std::collections::BTreeMap<String, String> = if config_path.exists() {
1341 let data = fs::read(&config_path)?;
1342 metadata::deserialize(&data)?
1343 } else {
1344 std::collections::BTreeMap::new()
1345 };
1346 let overrides = config::load_env_overrides();
1347 for (k, v) in overrides {
1348 config.insert(k, v);
1349 }
1350 Ok(config)
1351}
1352
1353fn save_config(
1354 shard_dir: &Path,
1355 config: &std::collections::BTreeMap<String, String>,
1356) -> Result<()> {
1357 let fmt = MetadataFormat::from_config(config);
1358 let data = metadata::serialize(config, &fmt);
1359 fs::write(shard_dir.join("config.json"), data)?;
1360 Ok(())
1361}
1362
1363fn global_keys_dir() -> Option<PathBuf> {
1364 std::env::var("HOME")
1365 .ok()
1366 .map(|h| Path::new(&h).join(".shard").join("keys"))
1367}
1368
1369fn load_keypair(shard_dir: &Path) -> Result<KeyPair> {
1370 let local = shard_dir.join("keys");
1371 if local.join("secret.key").exists() {
1372 match KeyPair::load(&local) {
1373 Ok(kp) => return Ok(kp),
1374 Err(e) => {
1375 if e.to_string().contains("encrypted") {
1376 if let Some(passphrase) = get_cached_passphrase() {
1377 return KeyPair::load_with_passphrase(&local, &passphrase);
1378 }
1379 anyhow::bail!(
1380 "secret.key is encrypted. Use `shard unlock --passphrase <pass>` or provide --passphrase"
1381 );
1382 }
1383 return Err(e);
1384 }
1385 }
1386 }
1387 if let Some(global) = global_keys_dir() {
1388 if global.join("secret.key").exists() {
1389 match KeyPair::load(&global) {
1390 Ok(kp) => return Ok(kp),
1391 Err(e) => {
1392 if e.to_string().contains("encrypted") {
1393 if let Some(passphrase) = get_cached_passphrase() {
1394 return KeyPair::load_with_passphrase(&global, &passphrase);
1395 }
1396 }
1397 return Err(e);
1398 }
1399 }
1400 }
1401 }
1402 anyhow::bail!("no keypair found in {} or ~/.shard/keys/", local.display())
1403}
1404
1405fn load_public_key(shard_dir: &Path) -> Result<Vec<u8>> {
1406 let local = shard_dir.join("keys/public.key");
1407 if local.exists() {
1408 return Ok(fs::read(&local)?);
1409 }
1410 if let Some(global) = global_keys_dir() {
1411 let gp = global.join("public.key");
1412 if gp.exists() {
1413 return Ok(fs::read(&gp)?);
1414 }
1415 }
1416 anyhow::bail!(
1417 "no public key found in {} or ~/.shard/keys/",
1418 local.display()
1419 )
1420}
1421
1422fn maybe_load_cipher(shard_dir: &Path) -> Result<Option<encryption::RepoCipher>> {
1423 let config = load_config(shard_dir)?;
1424 if config.get("private").map(|s| s.as_str()) == Some("true") {
1425 let key = encryption::load_repo_key(&shard_dir.join("keys"))?;
1426 Ok(Some(encryption::RepoCipher::from_key(&key)))
1427 } else {
1428 Ok(None)
1429 }
1430}
1431
1432pub fn config_get(path: &Path, key: Option<&str>) -> Result<()> {
1433 let shard_dir = path.join(".shard");
1434 if !shard_dir.exists() {
1435 anyhow::bail!("not a shard repository (run `shard init` first)");
1436 }
1437 let config = load_config(&shard_dir)?;
1438 if let Some(key) = key {
1439 match config.get(key) {
1440 Some(value) => info!("{} = {}", key, value),
1441 None => anyhow::bail!("config key not found: {}", key),
1442 }
1443 } else {
1444 for (k, v) in &config {
1445 info!("{} = {}", k, v);
1446 }
1447 }
1448 Ok(())
1449}
1450
1451pub fn config_set(path: &Path, key: &str, value: &str) -> Result<()> {
1452 let shard_dir = path.join(".shard");
1453 if !shard_dir.exists() {
1454 anyhow::bail!("not a shard repository (run `shard init` first)");
1455 }
1456 let mut config = load_config(&shard_dir)?;
1457 config.insert(key.to_string(), value.to_string());
1458 save_config(&shard_dir, &config)?;
1459 info!("{} = {}", key, value);
1460 Ok(())
1461}
1462
1463pub(crate) fn load_tags(shard_dir: &Path) -> Result<std::collections::BTreeMap<String, String>> {
1464 let tags_path = shard_dir.join("tags.json");
1465 if tags_path.exists() {
1466 let data = fs::read(&tags_path)?;
1467 Ok(serde_json::from_slice(&data)?)
1468 } else {
1469 Ok(std::collections::BTreeMap::new())
1470 }
1471}
1472
1473pub(crate) fn save_tags(
1474 shard_dir: &Path,
1475 tags: &std::collections::BTreeMap<String, String>,
1476) -> Result<()> {
1477 let data = serde_json::to_string_pretty(tags)?;
1478 fs::write(shard_dir.join("tags.json"), data)?;
1479 Ok(())
1480}
1481
1482pub fn tag_add(path: &Path, name: &str, commit_id: &str) -> Result<()> {
1483 let shard_dir = path.join(".shard");
1484 if !shard_dir.exists() {
1485 anyhow::bail!("not a shard repository (run `shard init` first)");
1486 }
1487 let store = Store::open(&shard_dir)?;
1489 load_commit(&store, commit_id)?;
1490 let mut tags = load_tags(&shard_dir)?;
1491 tags.insert(name.to_string(), commit_id.to_string());
1492 save_tags(&shard_dir, &tags)?;
1493 info!("Tagged '{}' -> {}", name, commit_id);
1494 Ok(())
1495}
1496
1497pub fn branch_create(path: &Path, name: &str, commit_id: Option<&str>) -> Result<()> {
1498 let shard_dir = path.join(".shard");
1499 if !shard_dir.exists() {
1500 anyhow::bail!("not a shard repository (run `shard init` first)");
1501 }
1502 let id = match commit_id {
1503 Some(cid) => cid.to_string(),
1504 None => {
1505 let (_, head) = branch::resolve_head(&shard_dir)?;
1506 head.ok_or_else(|| anyhow::anyhow!("No commits yet — cannot create branch"))?
1507 }
1508 };
1509 let store = Store::open(&shard_dir)?;
1511 load_commit(&store, &id)?;
1512 branch::create_branch(&shard_dir, name, &id)
1513}
1514
1515pub fn branch_delete(path: &Path, name: &str) -> Result<()> {
1516 let shard_dir = path.join(".shard");
1517 if !shard_dir.exists() {
1518 anyhow::bail!("not a shard repository (run `shard init` first)");
1519 }
1520 branch::delete_branch(&shard_dir, name)
1521}
1522
1523pub fn branch_list(path: &Path) -> Result<()> {
1524 let shard_dir = path.join(".shard");
1525 if !shard_dir.exists() {
1526 anyhow::bail!("not a shard repository (run `shard init` first)");
1527 }
1528 let (current, branches) = branch::list_branches(&shard_dir)?;
1529 if branches.is_empty() {
1530 info!("No branches.");
1531 return Ok(());
1532 }
1533 for (name, commit_id) in &branches {
1534 let prefix = if current.as_deref() == Some(name) {
1535 "* "
1536 } else {
1537 " "
1538 };
1539 info!(
1540 "{}{} ({})",
1541 prefix,
1542 name,
1543 &commit_id[..8.min(commit_id.len())]
1544 );
1545 }
1546 Ok(())
1547}
1548
1549pub fn merge(path: &Path, branch: &str, message: &str, author: &str, json: bool) -> Result<()> {
1550 let _guard = OP_QUEUES
1551 .get_or_create(&path.to_path_buf())
1552 .acquire(ops::OpKind::Write, "merge".to_string());
1553 let shard_dir = path.join(".shard");
1554 if !shard_dir.exists() {
1555 anyhow::bail!("not a shard repository (run `shard init` first)");
1556 }
1557
1558 let store = Store::open(&shard_dir)?;
1559
1560 let config = load_config(&shard_dir)?;
1561 let fmt = MetadataFormat::from_config(&config);
1562
1563 let (current_branch, current_id) = branch::resolve_head(&shard_dir)?;
1565 let current_id =
1566 current_id.ok_or_else(|| anyhow::anyhow!("No commits yet — nothing to merge into"))?;
1567
1568 let source_id = branch::resolve_rev(&shard_dir, branch)?;
1570 if source_id == current_id {
1571 anyhow::bail!("Already up to date — source is the same commit as HEAD");
1572 }
1573
1574 let current_commit = load_commit(&store, ¤t_id)?;
1576 let source_commit = load_commit(&store, &source_id)?;
1577
1578 let mut merged_manifests: std::collections::HashMap<String, (String, Vec<String>)> =
1580 std::collections::HashMap::new();
1581
1582 for manifest_id in ¤t_commit.manifests {
1583 let data = store.get_chunk(manifest_id)?;
1584 let manifest: FileManifest = metadata::deserialize(&data)?;
1585 merged_manifests.insert(manifest.name.clone(), (manifest.name, manifest.chunks));
1586 }
1587
1588 for manifest_id in &source_commit.manifests {
1589 let data = store.get_chunk(manifest_id)?;
1590 let manifest: FileManifest = metadata::deserialize(&data)?;
1591 merged_manifests.insert(manifest.name.clone(), (manifest.name, manifest.chunks));
1592 }
1593
1594 let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
1596 let keys = load_keypair(&shard_dir)?;
1597 let signing_key = keys.signing_key;
1598 let mut merged_manifest_ids = Vec::new();
1599 for (name, chunks) in merged_manifests.values() {
1600 let compression = Compression::None;
1601 let mut manifest = FileManifest {
1602 name: name.clone(),
1603 size: 0,
1604 chunks: chunks.clone(),
1605 content_type: None,
1606 compression: compression.as_str().to_string(),
1607 merkle_root: Some(FileManifest::merkle_root(chunks)),
1608 created_by: Some(author.to_string()),
1609 created_at: Some(timestamp),
1610 signature: None,
1611 };
1612
1613 let mut unsigned = manifest.clone();
1614 unsigned.signature = None;
1615 let canonical = metadata::serialize_for_signing(&unsigned);
1616 let sig = signing_key.sign(&canonical);
1617 manifest.signature = Some(hex::encode(sig.to_bytes()));
1618
1619 let encoded = metadata::serialize(&manifest, &fmt);
1620 let hash = blake3::hash(&encoded);
1621 store.put_chunk(&crate::chunker::Chunk {
1622 hash,
1623 data: encoded,
1624 offset: 0,
1625 })?;
1626 merged_manifest_ids.push(hash.to_hex().to_string());
1627 }
1628 merged_manifest_ids.sort();
1629
1630 let public_key_hex = hex::encode(keys.verifying_key.to_bytes());
1632 let key_id = keychain::get_current_key_id(&shard_dir.join("keys")).ok();
1633 let parents = vec![current_id.clone(), source_id.clone()];
1634 let mut commit = Commit {
1635 commit_id: String::new(),
1636 parents,
1637 manifests: merged_manifest_ids,
1638 author: author.to_string(),
1639 message: message.to_string(),
1640 timestamp,
1641 public_key: Some(public_key_hex),
1642 signature: None,
1643 key_id,
1644 };
1645
1646 let json_unsigned = metadata::serialize_for_signing(&commit);
1647 let signature = signing_key.sign(&json_unsigned);
1648 commit.signature = Some(hex::encode(signature.to_bytes()));
1649
1650 let encoded = metadata::serialize(&commit, &fmt);
1651 let hash = blake3::hash(&encoded);
1652 store.put_chunk(&crate::chunker::Chunk {
1653 hash,
1654 data: encoded,
1655 offset: 0,
1656 })?;
1657
1658 let merge_commit_id = hash.to_hex().to_string();
1659
1660 if has_dag_cycle(&store, &commit.parents, &merge_commit_id)? {
1662 anyhow::bail!(
1663 "Cycle detected in merge: commit {} is already an ancestor of one or more parents",
1664 merge_commit_id
1665 );
1666 }
1667
1668 if let Some(ref branch_name) = current_branch {
1670 branch::update_branch_ref(&shard_dir, branch_name, &merge_commit_id)?;
1671 branch::set_head_branch(&shard_dir, branch_name)?;
1672 } else {
1673 branch::set_head_commit(&shard_dir, &merge_commit_id)?;
1674 }
1675
1676 if json {
1677 info!(
1678 "{}",
1679 serde_json::to_string(&serde_json::json!({
1680 "commit_id": merge_commit_id,
1681 "message": message,
1682 }))?
1683 );
1684 } else {
1685 info!("Merge commit {} ({})", merge_commit_id, message);
1686 }
1687 Ok(())
1688}
1689
1690pub fn tag_list(path: &Path) -> Result<()> {
1691 let shard_dir = path.join(".shard");
1692 if !shard_dir.exists() {
1693 anyhow::bail!("not a shard repository (run `shard init` first)");
1694 }
1695 let tags = load_tags(&shard_dir)?;
1696 if tags.is_empty() {
1697 info!("No tags.");
1698 } else {
1699 for (name, commit_id) in &tags {
1700 info!("{} -> {}", name, commit_id);
1701 }
1702 }
1703 Ok(())
1704}
1705
1706fn collect_reachable(
1707 store: &Store,
1708 commit_id: &str,
1709 seen_commits: &mut std::collections::HashSet<String>,
1710 reachable: &mut std::collections::HashSet<String>,
1711) -> Result<()> {
1712 if !seen_commits.insert(commit_id.to_string()) {
1713 return Ok(());
1714 }
1715
1716 reachable.insert(commit_id.to_string());
1717
1718 let commit = match load_commit(store, commit_id) {
1719 Ok(c) => c,
1720 Err(_) => return Ok(()),
1721 };
1722
1723 for manifest_id in &commit.manifests {
1724 reachable.insert(manifest_id.clone());
1725
1726 if let Ok(data) = store.get_chunk(manifest_id) {
1727 if let Ok(manifest) = metadata::deserialize::<FileManifest>(&data) {
1728 for chunk_id in &manifest.chunks {
1729 reachable.insert(chunk_id.clone());
1730 }
1731 }
1732 }
1733 }
1734
1735 for parent_id in &commit.parents {
1736 collect_reachable(store, parent_id, seen_commits, reachable)?;
1737 }
1738
1739 Ok(())
1740}
1741
1742pub fn prune(path: &Path, json: bool) -> Result<()> {
1743 let _guard = OP_QUEUES
1744 .get_or_create(&path.to_path_buf())
1745 .acquire(ops::OpKind::Write, "prune".to_string());
1746 let shard_dir = path.join(".shard");
1747 if !shard_dir.exists() {
1748 anyhow::bail!("not a shard repository (run `shard init` first)");
1749 }
1750
1751 let config = load_config(&shard_dir)?;
1752 let fmt = MetadataFormat::from_config(&config);
1753
1754 let store = Store::open(&shard_dir)?;
1755 let mut reachable: std::collections::HashSet<String> = std::collections::HashSet::new();
1756
1757 let (_, head_commit) = branch::resolve_head(&shard_dir)?;
1759 if let Some(ref head) = head_commit {
1760 collect_reachable(
1761 &store,
1762 head,
1763 &mut std::collections::HashSet::new(),
1764 &mut reachable,
1765 )?;
1766 }
1767
1768 if let Ok(branches) = branch::list_branches(&shard_dir) {
1770 for (_, commit_id) in branches.1 {
1771 collect_reachable(
1772 &store,
1773 &commit_id,
1774 &mut std::collections::HashSet::new(),
1775 &mut reachable,
1776 )?;
1777 }
1778 }
1779
1780 let tags = load_tags(&shard_dir)?;
1782 for commit_id in tags.values() {
1783 collect_reachable(
1784 &store,
1785 commit_id,
1786 &mut std::collections::HashSet::new(),
1787 &mut reachable,
1788 )?;
1789 }
1790
1791 let index = Index::load(&shard_dir.join("index"), &fmt)?;
1793 for manifest in index.files.values() {
1794 let json = metadata::serialize(manifest, &fmt);
1795 let hash = blake3::hash(&json);
1796 let hash_hex = hash.to_hex().to_string();
1797 reachable.insert(hash_hex);
1798 for chunk_hash in &manifest.chunks {
1799 reachable.insert(chunk_hash.clone());
1800 }
1801 }
1802
1803 let all_chunks = store.iter_chunks()?;
1805 let mut pruned = 0u64;
1806 let mut kept = 0u64;
1807 for (hash_hex, full_path) in &all_chunks {
1808 if !reachable.contains(hash_hex) {
1809 store.delete_chunk(hash_hex, Some(full_path))?;
1810 pruned += 1;
1811 } else {
1812 kept += 1;
1813 }
1814 }
1815
1816 if json {
1817 info!(
1818 "{}",
1819 serde_json::to_string(&serde_json::json!({
1820 "pruned": pruned,
1821 "remaining": kept,
1822 }))?
1823 );
1824 } else {
1825 info!("Pruned {} objects. {} objects remain.", pruned, kept);
1826 }
1827 Ok(())
1828}
1829
1830pub fn peer_add(path: &Path, multiaddr: &str, json: bool) -> Result<()> {
1831 let shard_dir = path.join(".shard");
1832 if !shard_dir.exists() {
1833 anyhow::bail!("not a shard repository (run `shard init` first)");
1834 }
1835
1836 if multiaddr.is_empty() || multiaddr.parse::<shard_net::libp2p::Multiaddr>().is_err() {
1838 anyhow::bail!("invalid multiaddr '{}' (must be a valid libp2p multiaddr, e.g. /ip4/1.2.3.4/tcp/5678/p2p/...)", multiaddr);
1839 }
1840
1841 let peers_path = shard_dir.join("peers.json");
1842 let mut peers: Vec<String> = if peers_path.exists() {
1843 let data = fs::read(&peers_path)?;
1844 serde_json::from_slice(&data)?
1845 } else {
1846 Vec::new()
1847 };
1848
1849 let added = if !peers.contains(&multiaddr.to_string()) {
1850 peers.push(multiaddr.to_string());
1851 let data = serde_json::to_vec(&peers)?;
1852 fs::write(peers_path, data)?;
1853 true
1854 } else {
1855 false
1856 };
1857
1858 if json {
1859 info!(
1860 "{}",
1861 serde_json::to_string(&serde_json::json!({
1862 "multiaddr": multiaddr,
1863 "added": added,
1864 }))?
1865 );
1866 } else if added {
1867 info!("Added peer: {}", multiaddr);
1868 } else {
1869 info!("Peer already exists: {}", multiaddr);
1870 }
1871
1872 Ok(())
1873}
1874
1875fn load_peers(shard_dir: &Path) -> Result<Vec<String>> {
1876 let peers_path = shard_dir.join("peers.json");
1877 if peers_path.exists() {
1878 let data = fs::read(peers_path)?;
1879 Ok(serde_json::from_slice(&data)?)
1880 } else {
1881 Ok(Vec::new())
1882 }
1883}
1884
1885fn authorized_keys_path(shard_dir: &Path) -> std::path::PathBuf {
1886 shard_dir.join("authorized_keys")
1887}
1888
1889struct AuthorizedKeyEntry {
1891 key: ed25519_dalek::VerifyingKey,
1892 comment: String,
1893}
1894
1895fn parse_authorized_key_line(line: &str) -> Option<AuthorizedKeyEntry> {
1896 let line = line.trim();
1897 if line.is_empty() || line.starts_with('#') {
1898 return None;
1899 }
1900 let parts: Vec<&str> = line.splitn(2, char::is_whitespace).collect();
1902 let hex_key = parts[0].trim();
1903 let comment = parts.get(1).unwrap_or(&"").trim().to_string();
1904 let bytes = hex::decode(hex_key).ok()?;
1905 let arr: [u8; 32] = bytes.try_into().ok()?;
1906 let key = ed25519_dalek::VerifyingKey::from_bytes(&arr).ok()?;
1907 Some(AuthorizedKeyEntry { key, comment })
1908}
1909
1910fn load_authorized_keys(shard_dir: &Path) -> Result<Vec<ed25519_dalek::VerifyingKey>> {
1911 let path = authorized_keys_path(shard_dir);
1912 if !path.exists() {
1913 return Ok(Vec::new());
1914 }
1915 let content = fs::read_to_string(&path)?;
1916 let mut keys = Vec::new();
1917 for line in content.lines() {
1918 if let Some(entry) = parse_authorized_key_line(line) {
1919 keys.push(entry.key);
1920 }
1921 }
1922 Ok(keys)
1923}
1924
1925pub fn add_authorized_key(shard_dir: &Path, public_key_hex: &str) -> Result<()> {
1926 let bytes = hex::decode(public_key_hex)?;
1928 let arr: [u8; 32] = bytes
1929 .as_slice()
1930 .try_into()
1931 .map_err(|_| anyhow::anyhow!("Public key must be 32 bytes (64 hex chars)"))?;
1932 let _pk = ed25519_dalek::VerifyingKey::from_bytes(&arr)?;
1933
1934 let path = authorized_keys_path(shard_dir);
1935 let mut content = if path.exists() {
1936 fs::read_to_string(&path)?
1937 } else {
1938 String::new()
1939 };
1940 if content.lines().any(|l| {
1942 if let Some(entry) = parse_authorized_key_line(l) {
1943 entry.key.to_bytes().to_vec() == arr.to_vec()
1944 } else {
1945 false
1946 }
1947 }) {
1948 info!("Key already authorized");
1949 return Ok(());
1950 }
1951 content.push_str(public_key_hex);
1952 content.push('\n');
1953 fs::write(&path, content)?;
1954 info!("Authorized key added");
1955 Ok(())
1956}
1957
1958pub fn remove_authorized_key(shard_dir: &Path, public_key_hex: &str) -> Result<()> {
1959 let path = authorized_keys_path(shard_dir);
1960 if !path.exists() {
1961 anyhow::bail!("authorized_keys file does not exist");
1962 }
1963 let content = fs::read_to_string(&path)?;
1964 let new_content: Vec<&str> = content
1965 .lines()
1966 .filter(|l| {
1967 if let Some(entry) = parse_authorized_key_line(l) {
1968 hex::encode(entry.key.to_bytes()) != public_key_hex
1969 } else {
1970 true
1971 }
1972 })
1973 .collect();
1974 if new_content.len() == content.lines().count() {
1975 anyhow::bail!("Key not found in authorized_keys");
1976 }
1977 let has_keys = new_content
1978 .iter()
1979 .any(|l| parse_authorized_key_line(l).is_some());
1980 if has_keys {
1981 fs::write(&path, new_content.join("\n") + "\n")?;
1982 } else {
1983 fs::remove_file(&path)?;
1984 }
1985 info!("Authorized key removed");
1986 Ok(())
1987}
1988
1989pub fn list_authorized_keys(shard_dir: &Path, json: bool) -> Result<()> {
1990 let path = authorized_keys_path(shard_dir);
1991 if !path.exists() {
1992 if json {
1993 info!(
1994 "{}",
1995 serde_json::to_string(&serde_json::json!({"keys": []}))?
1996 );
1997 } else {
1998 println!("No authorized keys.");
1999 }
2000 return Ok(());
2001 }
2002 let content = fs::read_to_string(&path)?;
2003 let mut entries = Vec::new();
2004 for line in content.lines() {
2005 if let Some(entry) = parse_authorized_key_line(line) {
2006 entries.push(serde_json::json!({
2007 "key": hex::encode(entry.key.to_bytes()),
2008 "comment": entry.comment,
2009 }));
2010 }
2011 }
2012 if entries.is_empty() {
2013 if json {
2014 info!(
2015 "{}",
2016 serde_json::to_string(&serde_json::json!({"keys": []}))?
2017 );
2018 } else {
2019 println!("No authorized keys.");
2020 }
2021 return Ok(());
2022 }
2023 if json {
2024 info!(
2025 "{}",
2026 serde_json::to_string(&serde_json::json!({"keys": entries}))?
2027 );
2028 } else {
2029 for e in &entries {
2030 let comment = e["comment"].as_str().unwrap_or("");
2031 if comment.is_empty() {
2032 println!(" {}", e["key"].as_str().unwrap_or(""));
2033 } else {
2034 println!(" {} {}", e["key"].as_str().unwrap_or(""), comment);
2035 }
2036 }
2037 }
2038 Ok(())
2039}
2040
2041pub fn backup(path: &Path, output: &Path, json: bool) -> Result<()> {
2042 let shard_dir = path.join(".shard");
2043 if !shard_dir.exists() {
2044 anyhow::bail!("not a shard repository (run `shard init` first)");
2045 }
2046 let file = fs::File::create(output)?;
2047 let encoder = flate2::write::GzEncoder::new(file, flate2::Compression::default());
2048 let mut archive = tar::Builder::new(encoder);
2049 archive.append_dir_all(".shard", &shard_dir)?;
2050 archive.finish()?;
2051 if json {
2052 info!(
2053 "{}",
2054 serde_json::to_string(&serde_json::json!({
2055 "path": output.to_string_lossy(),
2056 }))?
2057 );
2058 } else {
2059 info!("Backup created: {}", output.display());
2060 }
2061 Ok(())
2062}
2063
2064pub fn export(path: &Path, commit_id: &str, output_dir: &Path, json: bool) -> Result<()> {
2065 let shard_dir = path.join(".shard");
2066 if !shard_dir.exists() {
2067 anyhow::bail!("not a shard repository (run `shard init` first)");
2068 }
2069 let store = Store::open(&shard_dir)?;
2070 let cipher = maybe_load_cipher(&shard_dir)?;
2071 let commit = load_commit(&store, commit_id)?;
2072 let mut files = Vec::new();
2073 for manifest_id in &commit.manifests {
2074 let data = store.get_chunk(manifest_id)?;
2075 let manifest: FileManifest = metadata::deserialize(&data)?;
2076 let compression = manifest.compression.parse::<Compression>()?;
2077 if !json {
2078 info!("Exporting file: {}", manifest.name);
2079 }
2080 let mut file_data = Vec::new();
2081 for chunk_id in &manifest.chunks {
2082 let chunk_data = store.get_chunk(chunk_id)?;
2083 let decrypted = match &cipher {
2084 Some(c) => c.decrypt(&chunk_data)?,
2085 None => chunk_data,
2086 };
2087 let decompressed = compression.decompress(&decrypted)?;
2088 file_data.extend_from_slice(&decompressed);
2089 }
2090 let out_path = output_dir.join(&manifest.name);
2091 if let Some(parent) = out_path.parent() {
2092 fs::create_dir_all(parent)?;
2093 }
2094 fs::write(&out_path, file_data)?;
2095 if !json {
2096 info!(" -> {}", out_path.display());
2097 }
2098 files.push(manifest.name);
2099 }
2100 if json {
2101 info!(
2102 "{}",
2103 serde_json::to_string(&serde_json::json!({
2104 "commit_id": commit_id,
2105 "files": files,
2106 "output_dir": output_dir.to_string_lossy(),
2107 }))?
2108 );
2109 } else {
2110 info!("Export complete.");
2111 }
2112 Ok(())
2113}
2114
2115pub fn import(
2116 path: &Path,
2117 source_dir: &Path,
2118 message: &str,
2119 author: &str,
2120 json: bool,
2121) -> Result<()> {
2122 let shard_dir = path.join(".shard");
2123 if !shard_dir.exists() {
2124 anyhow::bail!("not a shard repository (run `shard init` first)");
2125 }
2126 let config = load_config(&shard_dir)?;
2128 let compression: Compression = config
2129 .get("compression")
2130 .map(|s| s.as_str())
2131 .unwrap_or("zstd")
2132 .parse()?;
2133 let chunker_mode = chunker::ChunkerMode::from_config(&config);
2134 let fmt = MetadataFormat::from_config(&config);
2135 let store = Store::open(&shard_dir)?;
2136 let cipher = maybe_load_cipher(&shard_dir)?;
2137 let mut index = Index::load(&shard_dir.join("index"), &fmt)?;
2138 if !source_dir.is_dir() {
2139 anyhow::bail!("Source must be a directory");
2140 }
2141 for entry in walkdir::WalkDir::new(source_dir)
2142 .into_iter()
2143 .filter_entry(|e| {
2144 e.file_name()
2145 .to_str()
2146 .map(|s| !s.starts_with('.'))
2147 .unwrap_or(false)
2148 })
2149 {
2150 let entry = entry?;
2151 if entry.file_type().is_file() {
2152 add_file(
2153 path,
2154 entry.path(),
2155 &store,
2156 &mut index,
2157 &compression,
2158 &chunker_mode,
2159 cipher.as_ref(),
2160 json,
2161 )?;
2162 }
2163 }
2164 index.save(&shard_dir.join("index"), &fmt)?;
2165 if !index.files.is_empty() {
2167 commit(path, message, author, json)?;
2168 } else if json {
2169 info!(
2170 "{}",
2171 serde_json::to_string(&serde_json::json!({"status": "no files found"}))?
2172 );
2173 } else {
2174 info!("No files found to import.");
2175 }
2176 Ok(())
2177}
2178
2179pub fn restore(path: &Path, backup_file: &Path, json: bool) -> Result<()> {
2180 let shard_dir = path.join(".shard");
2181 if shard_dir.exists() {
2182 anyhow::bail!(
2183 "Repository already exists — remove .shard first or use a different directory"
2184 );
2185 }
2186 let file = fs::File::open(backup_file)?;
2187 let decoder = flate2::read::GzDecoder::new(file);
2188 let mut archive = tar::Archive::new(decoder);
2189 archive.unpack(path)?;
2190 if !path.join(".shard").exists() {
2192 anyhow::bail!("Backup does not contain a valid .shard directory");
2193 }
2194 if json {
2195 info!(
2196 "{}",
2197 serde_json::to_string(&serde_json::json!({
2198 "backup": backup_file.to_string_lossy(),
2199 }))?
2200 );
2201 } else {
2202 info!("Restored from {}", backup_file.display());
2203 }
2204 Ok(())
2205}
2206
2207struct RepoProvider {
2208 store: Store,
2209 shard_dir: std::path::PathBuf,
2210}
2211
2212impl shard_net::p2p::ShardContentProvider for RepoProvider {
2213 fn get_manifest(&self, id: &str) -> Option<Vec<u8>> {
2214 self.store.get_chunk(id).ok()
2215 }
2216 fn get_chunk(&self, id: &str) -> Option<Vec<u8>> {
2217 self.store.get_chunk(id).ok()
2218 }
2219 fn put_chunk(&mut self, id: &str, data: &[u8]) -> bool {
2220 let hash = blake3::hash(data);
2221 let hex = hash.to_hex().to_string();
2222 if hex != id {
2223 return false;
2224 }
2225 self.store
2226 .put_chunk(&crate::chunker::Chunk {
2227 hash,
2228 data: data.to_vec(),
2229 offset: 0,
2230 })
2231 .is_ok()
2232 }
2233 fn verify_auth(&self, public_key: &[u8], nonce: &[u8], signature: &[u8]) -> bool {
2234 use ed25519_dalek::Verifier;
2235 let pk_bytes: [u8; 32] = match public_key.try_into() {
2236 Ok(b) => b,
2237 Err(_) => return false,
2238 };
2239 let pk = match ed25519_dalek::VerifyingKey::from_bytes(&pk_bytes) {
2240 Ok(k) => k,
2241 Err(_) => return false,
2242 };
2243 let sig_bytes: [u8; 64] = match signature.try_into() {
2244 Ok(b) => b,
2245 Err(_) => return false,
2246 };
2247 let sig = ed25519_dalek::Signature::from_bytes(&sig_bytes);
2248 if pk.verify(nonce, &sig).is_err() {
2249 return false;
2250 }
2251 let auth_path = self.shard_dir.join("authorized_keys");
2255 if auth_path.exists() {
2256 if let Ok(keys) = load_authorized_keys(&self.shard_dir) {
2257 return keys.contains(&pk);
2258 }
2259 return false;
2260 }
2261 true
2262 }
2263 fn repo_public_key(&self) -> Option<Vec<u8>> {
2264 let keys = load_keypair(&self.shard_dir).ok()?;
2265 Some(keys.verifying_key.to_bytes().to_vec())
2266 }
2267}
2268
2269fn commit_stats(shard_dir: &Path, commit_id: &str) -> Result<(u64, u64)> {
2271 let store = Store::open(shard_dir)?;
2272 let commit = load_commit(&store, commit_id)?;
2273 let mut file_count = 0u64;
2274 let mut total_size = 0u64;
2275 for mid in &commit.manifests {
2276 if let Ok(data) = store.get_chunk(mid) {
2277 if let Ok(m) = metadata::deserialize::<FileManifest>(&data) {
2278 file_count += 1;
2279 total_size += m.size;
2280 }
2281 }
2282 }
2283 Ok((file_count, total_size))
2284}
2285
2286pub async fn share(path: &Path, json: bool) -> Result<()> {
2287 let shard_dir = path.join(".shard");
2288 if !shard_dir.exists() {
2289 anyhow::bail!("not a shard repository (run `shard init` first)");
2290 }
2291
2292 let mut node = shard_net::p2p::Node::new().await?;
2293
2294 let ann_topic = shard_net::libp2p::gossipsub::IdentTopic::new("shard:ann");
2296 node.subscribe(&ann_topic)?;
2297
2298 let peers = load_peers(&shard_dir)?;
2300 for peer in peers {
2301 if let Ok(addr) = peer.parse::<shard_net::libp2p::Multiaddr>() {
2302 let _ = node.swarm.dial(addr);
2303 }
2304 }
2305
2306 node.swarm.behaviour_mut().kademlia.bootstrap().ok();
2308
2309 node.listen("/ip4/0.0.0.0/tcp/0").await?;
2310 let listen_addrs: Vec<String> = node.swarm.listeners().map(|a| a.to_string()).collect();
2311 let peer_multiaddr = listen_addrs.first().cloned().unwrap_or_default();
2312
2313 let config = load_config(&shard_dir)?;
2314 let repo_name = config.get("repo_name").cloned().unwrap_or_default();
2315
2316 let store = Store::open(&shard_dir)?;
2317 let provider = RepoProvider {
2318 store,
2319 shard_dir: shard_dir.clone(),
2320 };
2321
2322 if let Some(head) = branch::resolve_head(&shard_dir)?.1 {
2324 let (file_count, total_size) = commit_stats(&shard_dir, &head)?;
2325 let ann = shard_net::protocol::Announcement {
2326 commit_id: head,
2327 file_count,
2328 total_size,
2329 repo_name: repo_name.clone(),
2330 peer_multiaddr: format!("{}/p2p/{}", peer_multiaddr, node.local_peer_id()),
2331 };
2332 let payload = serde_json::to_vec(&ann)?;
2333 let _ = node.publish(&ann_topic, payload);
2334 }
2335
2336 if json {
2337 info!(
2338 "{}",
2339 serde_json::to_string(&serde_json::json!({
2340 "status": "sharing",
2341 "peer_id": node.local_peer_id().to_string(),
2342 }))?
2343 );
2344 } else {
2345 info!("Sharing repository...");
2346 }
2347 node.run(provider).await;
2348
2349 Ok(())
2350}
2351
2352pub async fn relay(listen_addr: &str, json: bool) -> Result<()> {
2355 let mut node = shard_net::p2p::Node::new().await?;
2356 node.listen(listen_addr).await?;
2357 if json {
2358 info!(
2359 "{}",
2360 serde_json::to_string(&serde_json::json!({
2361 "status": "relay active",
2362 "listen": listen_addr,
2363 "peer_id": node.local_peer_id().to_string(),
2364 }))?
2365 );
2366 } else {
2367 info!("Relay server active on {}", listen_addr);
2368 info!("Peer ID: {}", node.local_peer_id());
2369 info!("Ready to accept circuit relay v2 reservations");
2370 }
2371 node.run(EmptyProvider).await;
2372 Ok(())
2373}
2374
2375struct EmptyProvider;
2377impl shard_net::p2p::ShardContentProvider for EmptyProvider {
2378 fn get_manifest(&self, _id: &str) -> Option<Vec<u8>> {
2379 None
2380 }
2381 fn get_chunk(&self, _id: &str) -> Option<Vec<u8>> {
2382 None
2383 }
2384 fn put_chunk(&mut self, _id: &str, _data: &[u8]) -> bool {
2385 false
2386 }
2387 fn verify_auth(&self, _public_key: &[u8], _nonce: &[u8], _signature: &[u8]) -> bool {
2388 false
2389 }
2390 fn repo_public_key(&self) -> Option<Vec<u8>> {
2391 None
2392 }
2393}
2394
2395pub async fn sync(path: &Path, _json: bool) -> Result<()> {
2396 let shard_dir = path.join(".shard");
2397 if !shard_dir.exists() {
2398 anyhow::bail!("not a shard repository (run `shard init` first)");
2399 }
2400
2401 let config = load_config(&shard_dir)?;
2402 let repo_id = config.get("repo_id").cloned().unwrap_or_default();
2403 let repo_name = config.get("repo_name").cloned().unwrap_or_default();
2404 let rate_limit_max = config::config_get_rate_limit_max(&config);
2405 let _rate_limit_window = config::config_get_rate_limit_window(&config);
2406 let ann_topic = shard_net::libp2p::gossipsub::IdentTopic::new("shard:ann");
2407 let repo_topic =
2408 shard_net::libp2p::gossipsub::IdentTopic::new(format!("/shard/repo/{}", repo_id));
2409
2410 let mut node = shard_net::p2p::Node::new().await?;
2411 node.subscribe(&ann_topic)?;
2412 node.subscribe(&repo_topic)?;
2413 node.listen("/ip4/0.0.0.0/tcp/0").await?;
2414 let listen_addrs: Vec<String> = node.swarm.listeners().map(|a| a.to_string()).collect();
2415 let peer_multiaddr = listen_addrs.first().cloned().unwrap_or_default();
2416
2417 let peers = load_peers(&shard_dir)?;
2419 for peer in peers {
2420 if let Ok(addr) = peer.parse::<shard_net::libp2p::Multiaddr>() {
2421 let _ = node.swarm.dial(addr);
2422 }
2423 }
2424
2425 node.swarm.behaviour_mut().kademlia.bootstrap().ok();
2427
2428 let publish_ann = |node: &mut shard_net::p2p::Node, ann: &shard_net::protocol::Announcement| {
2430 if let Ok(payload) = serde_json::to_vec(ann) {
2431 let _ = node.publish(&ann_topic, payload.clone());
2432 let _ = node.publish(&repo_topic, payload);
2433 }
2434 };
2435
2436 let head_commit = branch::resolve_head(&shard_dir)?.1;
2437
2438 if let Some(ref head) = head_commit {
2440 let (file_count, total_size) = commit_stats(&shard_dir, head)?;
2441 let ann = shard_net::protocol::Announcement {
2442 commit_id: head.clone(),
2443 file_count,
2444 total_size,
2445 repo_name: repo_name.clone(),
2446 peer_multiaddr: format!("{}/p2p/{}", peer_multiaddr, node.local_peer_id()),
2447 };
2448 publish_ann(&mut node, &ann);
2449 if !_json {
2450 info!("Announced commit {} on sync topic", head)
2451 }
2452 } else if !_json {
2453 info!("No commits to announce");
2454 }
2455
2456 if !_json {
2457 info!("Syncing on topic with peer id: {}", node.local_peer_id());
2458 }
2459 let _ = std::io::stdout().flush();
2460
2461 let store = Store::open(&shard_dir)?;
2462 let mut provider = RepoProvider {
2463 store,
2464 shard_dir: shard_dir.clone(),
2465 };
2466
2467 let mut interval = tokio::time::interval(Duration::from_secs(5));
2468 let mut address_book: HashMap<shard_net::libp2p::PeerId, Vec<shard_net::libp2p::Multiaddr>> =
2469 HashMap::new();
2470 let mut announce_counts: HashMap<(shard_net::libp2p::PeerId, String), u32> = HashMap::new();
2471 let mut request_counts: HashMap<shard_net::libp2p::PeerId, u32> = HashMap::new();
2472 let path_buf = path.to_path_buf();
2473
2474 loop {
2475 tokio::select! {
2476 _ = tokio::signal::ctrl_c() => {
2477 info!("\nSync shutting down...");
2478 break Ok(());
2479 }
2480 _ = interval.tick() => {
2481 request_counts.clear();
2483 announce_counts.clear();
2484 if let Some(ref head) = branch::resolve_head(&shard_dir)?.1 {
2485 let (fc, ts) = commit_stats(&shard_dir, head).unwrap_or_default();
2486 let ann = shard_net::protocol::Announcement {
2487 commit_id: head.clone(),
2488 file_count: fc,
2489 total_size: ts,
2490 repo_name: repo_name.clone(),
2491 peer_multiaddr: format!("{}/p2p/{}", peer_multiaddr, node.local_peer_id()),
2492 };
2493 publish_ann(&mut node, &ann);
2494 info!("Re-announced commit {} on sync topic", head);
2495 }
2496 }
2497 event = node.swarm.select_next_some() => {
2498 match event {
2499 shard_net::libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
2500 info!("Listening on {address:?}");
2501 let _ = std::io::stdout().flush();
2502 }
2503 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2504 shard_net::p2p::ShardBehaviourEvent::Mdns(
2505 shard_net::libp2p::mdns::Event::Discovered(list),
2506 ),
2507 ) => {
2508 for (peer_id, multiaddr) in list {
2509 info!("mDNS discovered: {peer_id} {multiaddr}");
2510 address_book.entry(peer_id).or_default().push(multiaddr.clone());
2511 node.swarm
2512 .behaviour_mut()
2513 .gossipsub
2514 .add_explicit_peer(&peer_id);
2515 node.swarm
2516 .behaviour_mut()
2517 .kademlia
2518 .add_address(&peer_id, multiaddr);
2519 }
2520 }
2521 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2522 shard_net::p2p::ShardBehaviourEvent::Mdns(shard_net::libp2p::mdns::Event::Expired(
2523 list,
2524 )),
2525 ) => {
2526 for (peer_id, _multiaddr) in list {
2527 info!("mDNS expired: {peer_id}");
2528 node.swarm
2529 .behaviour_mut()
2530 .gossipsub
2531 .remove_explicit_peer(&peer_id);
2532 }
2533 }
2534 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2535 shard_net::p2p::ShardBehaviourEvent::Gossipsub(
2536 shard_net::libp2p::gossipsub::Event::Message {
2537 propagation_source,
2538 message,
2539 ..
2540 },
2541 ),
2542 ) => {
2543 if let Ok(ann) = serde_json::from_slice::<shard_net::protocol::Announcement>(&message.data) {
2545 let peer = propagation_source;
2546 let commit_id_owned = ann.commit_id;
2547 info!(
2548 "Peer {} announced commit: {} (repo: {}, {} files, {} bytes)",
2549 peer, commit_id_owned, ann.repo_name, ann.file_count, ann.total_size,
2550 );
2551 if !ann.peer_multiaddr.is_empty() && !address_book.contains_key(&peer) {
2553 if let Ok(addr) = ann.peer_multiaddr.parse::<shard_net::libp2p::Multiaddr>() {
2554 address_book.entry(peer).or_default().push(addr);
2555 }
2556 }
2557 let rate_key = (peer, commit_id_owned.clone());
2559 if announce_counts.get(&rate_key).copied().unwrap_or(0) > 5 {
2560 warn!("Rate limit exceeded for peer {} commit {}", peer, commit_id_owned);
2561 } else {
2562 *announce_counts.entry(rate_key).or_insert(0) += 1;
2563 let our_head = branch::resolve_head(&shard_dir)?.1.unwrap_or_default();
2565 if our_head != commit_id_owned {
2566 let (fc, ts) = commit_stats(&shard_dir, &our_head).unwrap_or_default();
2567 let reply = shard_net::protocol::Announcement {
2568 commit_id: our_head,
2569 file_count: fc,
2570 total_size: ts,
2571 repo_name: repo_name.clone(),
2572 peer_multiaddr: format!("{}/p2p/{}", peer_multiaddr, node.local_peer_id()),
2573 };
2574 publish_ann(&mut node, &reply);
2575 }
2576 if let Some(addrs) = address_book.get(&peer) {
2577 if let Some(addr) = addrs.first() {
2578 let multiaddr_str = format!("{}/p2p/{}", addr, peer);
2579 let path_clone = path_buf.clone();
2580 tokio::spawn(async move {
2581 match pull(&path_clone, &multiaddr_str, &commit_id_owned, false).await {
2582 Ok(_) => info!("Auto-pulled commit {} from {}", commit_id_owned, peer),
2583 Err(e) => error!("Auto-pull failed for commit {} from {}: {}", commit_id_owned, peer, e),
2584 }
2585 });
2586 }
2587 }
2588 }
2589 }
2590 }
2591 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2592 shard_net::p2p::ShardBehaviourEvent::RequestResponse(
2593 shard_net::libp2p::request_response::Event::Message { peer, message },
2594 ),
2595 ) => {
2596 if let shard_net::libp2p::request_response::Message::Request {
2597 request, channel, ..
2598 } = message
2599 {
2600 let req_count = request_counts.entry(peer).or_insert(0u32);
2602 *req_count += 1;
2603 if *req_count > rate_limit_max {
2604 warn!("Dropping request from {}: rate limit exceeded", peer);
2605 } else {
2607 info!("Received request from {}", peer);
2608 node.serve_request(&peer, &mut provider, request, channel);
2609 }
2610 } else {
2611 info!("Received Response from {}", peer);
2612 }
2613 }
2614 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2615 shard_net::p2p::ShardBehaviourEvent::RequestResponse(
2616 shard_net::libp2p::request_response::Event::OutboundFailure {
2617 peer, error, ..
2618 },
2619 ),
2620 ) => {
2621 error!("Outbound failure to {}: {:?}", peer, error);
2622 }
2623 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2624 shard_net::p2p::ShardBehaviourEvent::RequestResponse(
2625 shard_net::libp2p::request_response::Event::InboundFailure {
2626 peer, error, ..
2627 },
2628 ),
2629 ) => {
2630 error!("Inbound failure from {}: {:?}", peer, error);
2631 }
2632 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2633 shard_net::p2p::ShardBehaviourEvent::Identify(
2634 shard_net::libp2p::identify::Event::Received { peer_id, info },
2635 ),
2636 ) => {
2637 info!("Identify received from {}: {:?}", peer_id, info.listen_addrs);
2638 for addr in info.listen_addrs {
2639 address_book.entry(peer_id).or_default().push(addr);
2640 }
2641 let _ = std::io::stdout().flush();
2642 }
2643 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2644 shard_net::p2p::ShardBehaviourEvent::Identify(event),
2645 ) => {
2646 info!("Identify event: {:?}", event);
2647 }
2648 shard_net::libp2p::swarm::SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } => {
2649 info!("Connection established with {}", peer_id);
2650 if let shard_net::libp2p::core::ConnectedPoint::Dialer { address, .. } = &endpoint {
2653 address_book.entry(peer_id).or_default().push(address.clone());
2654 }
2655 node.swarm
2656 .behaviour_mut()
2657 .gossipsub
2658 .add_explicit_peer(&peer_id);
2659 if let Some(ref head) = branch::resolve_head(&shard_dir)?.1 {
2661 let (fc, ts) = commit_stats(&shard_dir, head).unwrap_or_default();
2662 let ann = shard_net::protocol::Announcement {
2663 commit_id: head.clone(),
2664 file_count: fc,
2665 total_size: ts,
2666 repo_name: repo_name.clone(),
2667 peer_multiaddr: format!("{}/p2p/{}", peer_multiaddr, node.local_peer_id()),
2668 };
2669 publish_ann(&mut node, &ann);
2670 }
2671 }
2672 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2673 shard_net::p2p::ShardBehaviourEvent::Relay(event),
2674 ) => {
2675 info!("Relay event: {:?}", event);
2676 }
2677 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2678 shard_net::p2p::ShardBehaviourEvent::Dcutr(event),
2679 ) => {
2680 info!("DCUtR event: {:?}", event);
2681 }
2682 shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2683 shard_net::p2p::ShardBehaviourEvent::Autonat(event),
2684 ) => {
2685 info!("AutoNAT event: {:?}", event);
2686 }
2687 shard_net::libp2p::swarm::SwarmEvent::IncomingConnection {
2688 local_addr,
2689 send_back_addr,
2690 ..
2691 } => {
2692 info!(
2693 "Incoming connection from {} to {}",
2694 send_back_addr, local_addr
2695 );
2696 }
2697 e => {
2698 info!("Event: {:?}", e);
2699 }
2700 }
2701 }
2702 }
2703 }
2704}
2705
2706pub async fn pull(path: &Path, peer: &str, commit_id: &str, json: bool) -> Result<()> {
2707 let shard_dir = path.join(".shard");
2708 if !shard_dir.exists() {
2711 init(path, "flat", "zstd", "fixed", None, false, false)?;
2712 }
2713
2714 let store = Store::open(&shard_dir)?;
2715 let cipher = maybe_load_cipher(&shard_dir)?;
2716
2717 let mut node = shard_net::p2p::Node::new().await?;
2718 let _ = node.listen("/ip4/0.0.0.0/tcp/0").await;
2719
2720 let multiaddr: shard_net::libp2p::Multiaddr = peer.parse()?;
2722 let peer_id = match multiaddr.iter().last() {
2723 Some(shard_net::libp2p::multiaddr::Protocol::P2p(peer_id)) => peer_id,
2724 _ => anyhow::bail!("Multiaddr must end with /p2p/<peer_id>"),
2725 };
2726
2727 if !json {
2729 info!("Pulling commit {} from {}...", commit_id, peer);
2730 }
2731 let commit_data = node
2732 .request_manifest(&multiaddr, peer_id, commit_id.to_string())
2733 .await?;
2734 let hash = blake3::hash(&commit_data);
2735 if hash.to_hex().to_string() != commit_id {
2736 anyhow::bail!("Commit hash mismatch");
2737 }
2738 let chunk = crate::chunker::Chunk {
2739 hash,
2740 data: commit_data.clone(),
2741 offset: 0,
2742 };
2743 store.put_chunk(&chunk)?;
2744
2745 let commit: Commit = metadata::deserialize(&commit_data)?;
2746 if !json {
2747 info!("Got commit: {}", commit.message);
2748 }
2749
2750 let keys_dir = shard_dir.join("keys");
2752 if let Some(kid) = &commit.key_id {
2753 if keys_dir.join("records").exists() {
2754 if let Ok(chain) = keychain::collect_rotation_chain(&keys_dir, kid) {
2755 let missing_rotations: Vec<&KeyRotation> = chain
2756 .iter()
2757 .filter(|r| store.get_chunk(&r.rotation_id).is_err())
2758 .collect();
2759 if !missing_rotations.is_empty() {
2760 if !json {
2761 info!(
2762 "Fetching {} key rotation records from peer...",
2763 missing_rotations.len()
2764 );
2765 }
2766 let rot_requests: Vec<(String, shard_net::protocol::ShardRequest)> =
2767 missing_rotations
2768 .iter()
2769 .map(|r| {
2770 (
2771 r.rotation_id.clone(),
2772 shard_net::protocol::ShardRequest::GetChunk(
2773 r.rotation_id.clone(),
2774 ),
2775 )
2776 })
2777 .collect();
2778 if let Ok(rot_results) = node
2779 .request_parallel(&multiaddr, peer_id, rot_requests)
2780 .await
2781 {
2782 for (rot_id, rot_data) in &rot_results {
2783 let rh = blake3::hash(rot_data);
2784 if rh.to_hex().to_string() != *rot_id {
2785 info!("Key rotation record hash mismatch (expected {}, got {}) — skipping", rot_id, rh.to_hex());
2786 continue;
2787 }
2788 store.put_chunk(&crate::chunker::Chunk {
2789 hash: rh,
2790 data: rot_data.clone(),
2791 offset: 0,
2792 })?;
2793 }
2794 if !json {
2795 info!("Key rotation records synced from peer.");
2796 }
2797 }
2798 }
2799 }
2800 }
2801 }
2802
2803 if let Some(pk_hex) = &commit.public_key {
2805 let pk_bytes = hex::decode(pk_hex)?;
2806 let repo_id = blake3::hash(&pk_bytes).to_hex().to_string();
2807 let mut config = load_config(&shard_dir)?;
2808 config.insert("repo_id".to_string(), repo_id);
2809 save_config(&shard_dir, &config)?;
2810 }
2811
2812 let manifest_requests: Vec<(String, shard_net::protocol::ShardRequest)> = commit
2814 .manifests
2815 .iter()
2816 .map(|id| {
2817 (
2818 id.clone(),
2819 shard_net::protocol::ShardRequest::GetManifest(id.clone()),
2820 )
2821 })
2822 .collect();
2823 let manifest_results = node
2824 .request_parallel(&multiaddr, peer_id, manifest_requests)
2825 .await?;
2826
2827 let mut all_chunk_ids: Vec<String> = Vec::new();
2828 let mut file_manifests: Vec<FileManifest> = Vec::new();
2829 let mut chunk_compression: HashMap<String, String> = HashMap::new();
2831
2832 for (manifest_id, manifest_data) in &manifest_results {
2833 let hash = blake3::hash(manifest_data);
2834 if hash.to_hex().to_string() != *manifest_id {
2835 anyhow::bail!("Manifest hash mismatch: {}", manifest_id);
2836 }
2837 let chunk = crate::chunker::Chunk {
2838 hash,
2839 data: manifest_data.clone(),
2840 offset: 0,
2841 };
2842 store.put_chunk(&chunk)?;
2843 let manifest: FileManifest = metadata::deserialize(manifest_data)?;
2844 if !json {
2845 info!(
2846 "Fetching file: {} (compression: {})",
2847 manifest.name, manifest.compression
2848 );
2849 }
2850 for cid in &manifest.chunks {
2851 chunk_compression.insert(cid.clone(), manifest.compression.clone());
2852 }
2853 all_chunk_ids.extend(manifest.chunks.clone());
2854 file_manifests.push(manifest);
2855 }
2856
2857 let partial = partial::PartialTransfer::new(&shard_dir, commit_id)?;
2859 let partial_chunks = partial.list_chunks()?;
2860 let mut recovered = 0usize;
2861 for chunk_id in &partial_chunks {
2862 if let Ok(data) = partial.load_chunk(chunk_id) {
2863 let hash = blake3::hash(&data);
2864 if hash.to_hex().to_string() == *chunk_id {
2865 let chunk = crate::chunker::Chunk {
2867 hash,
2868 data: data.clone(),
2869 offset: 0,
2870 };
2871 if store.put_chunk(&chunk).is_ok() {
2872 recovered += 1;
2873 }
2874 } else {
2875 let _ = partial.remove_chunk(chunk_id);
2877 }
2878 }
2879 }
2880 if recovered > 0 {
2881 info!("Recovered {} chunks from partial transfer", recovered);
2882 }
2883
2884 let needed_chunks: Vec<String> = all_chunk_ids
2886 .into_iter()
2887 .filter(|id| store.get_chunk(id).is_err())
2888 .collect();
2889
2890 if !needed_chunks.is_empty() {
2891 if !json {
2892 info!("Fetching {} chunks...", needed_chunks.len());
2893 }
2894 let chunk_requests: Vec<(String, shard_net::protocol::ShardRequest)> = needed_chunks
2895 .iter()
2896 .map(|id| {
2897 (
2898 id.clone(),
2899 shard_net::protocol::ShardRequest::GetChunk(id.clone()),
2900 )
2901 })
2902 .collect();
2903 let chunk_results = node
2904 .request_parallel(&multiaddr, peer_id, chunk_requests)
2905 .await?;
2906 for (chunk_id, chunk_data) in &chunk_results {
2907 let compression: Compression = chunk_compression
2909 .get(chunk_id)
2910 .map(|s| s.as_str())
2911 .unwrap_or("none")
2912 .parse()?;
2913 let decrypted = match &cipher {
2915 Some(c) => c.decrypt(chunk_data)?,
2916 None => chunk_data.clone(),
2917 };
2918 let decompressed = compression.decompress(&decrypted)?;
2919 let hash = blake3::hash(&decompressed);
2920 if hash.to_hex().to_string() != *chunk_id {
2921 anyhow::bail!("Chunk hash mismatch: {}", chunk_id);
2922 }
2923 let chunk = crate::chunker::Chunk {
2925 hash,
2926 data: chunk_data.clone(),
2927 offset: 0,
2928 };
2929 store.put_chunk(&chunk)?;
2930 partial.save_chunk(chunk_id, chunk_data)?;
2932 }
2933 }
2934
2935 for manifest in &file_manifests {
2937 let compression = manifest.compression.parse::<Compression>()?;
2938 let mut file_data = Vec::new();
2939 for chunk_id in &manifest.chunks {
2940 let stored = store.get_chunk(chunk_id)?;
2941 let decrypted = match &cipher {
2942 Some(c) => c.decrypt(&stored)?,
2943 None => stored,
2944 };
2945 let decompressed = compression.decompress(&decrypted)?;
2946 file_data.extend_from_slice(&decompressed);
2947 }
2948 fs::write(path.join(&manifest.name), file_data)?;
2949 if !json {
2950 info!(
2951 "Reconstructed file: {} ({} bytes)",
2952 manifest.name, manifest.size
2953 );
2954 }
2955 }
2956
2957 partial.cleanup()?;
2959
2960 if json {
2961 info!(
2962 "{}",
2963 serde_json::to_string(&serde_json::json!({
2964 "status": "pull complete",
2965 "commit_id": commit_id,
2966 }))?
2967 );
2968 } else {
2969 info!("Pull complete.");
2970 }
2971 Ok(())
2972}
2973
2974pub fn transfer_list(path: &Path, json: bool) -> Result<()> {
2975 let shard_dir = path.join(".shard");
2976 if !shard_dir.exists() {
2977 anyhow::bail!("not a shard repository (run `shard init` first)");
2978 }
2979 let transfers = partial::list_incomplete_transfers(&shard_dir)?;
2980 if json {
2981 info!("{}", serde_json::to_string(&transfers)?);
2982 } else {
2983 if transfers.is_empty() {
2984 info!("No incomplete transfers.");
2985 } else {
2986 for t in &transfers {
2987 info!("Incomplete transfer: {}", t);
2988 }
2989 }
2990 }
2991 Ok(())
2992}
2993
2994pub fn transfer_remove(path: &Path, commit_id: &str) -> Result<()> {
2995 let shard_dir = path.join(".shard");
2996 if !shard_dir.exists() {
2997 anyhow::bail!("not a shard repository (run `shard init` first)");
2998 }
2999 partial::remove_transfer(&shard_dir, commit_id)?;
3000 info!("Removed transfer tracking for {}", commit_id);
3001 Ok(())
3002}
3003
3004pub async fn push(path: &Path, peer: &str, json: bool) -> Result<()> {
3005 let _guard = OP_QUEUES
3006 .get_or_create(&path.to_path_buf())
3007 .acquire(ops::OpKind::Write, "push".to_string());
3008 let shard_dir = path.join(".shard");
3009 if !shard_dir.exists() {
3010 anyhow::bail!("not a shard repository (run `shard init` first)");
3011 }
3012
3013 let (_, head_id) = branch::resolve_head(&shard_dir)?;
3014 let head_id = head_id.ok_or_else(|| anyhow::anyhow!("No commits to push"))?;
3015
3016 let store = Store::open(&shard_dir)?;
3017
3018 let mut objects: std::collections::BTreeMap<String, Vec<u8>> =
3020 std::collections::BTreeMap::new();
3021
3022 let mut seen = std::collections::HashSet::new();
3024 let mut stack = vec![head_id.clone()];
3025 while let Some(cid) = stack.pop() {
3026 if !seen.insert(cid.clone()) {
3027 continue;
3028 }
3029 if let Ok(data) = store.get_chunk(&cid) {
3030 objects.insert(cid, data.clone());
3031 if let Ok(commit) = metadata::deserialize::<Commit>(&data) {
3032 if let Some(kid) = &commit.key_id {
3034 let keys_dir = shard_dir.join("keys");
3035 if let Ok(chain) = keychain::collect_rotation_chain(&keys_dir, kid) {
3036 for rot in &chain {
3037 let rj = serde_json::to_vec(rot)?;
3038 let rh = blake3::hash(&rj);
3039 if !store.has_chunk(rh.to_hex().as_ref()) {
3040 store.put_chunk(&crate::chunker::Chunk {
3041 hash: rh,
3042 data: rj.clone(),
3043 offset: 0,
3044 })?;
3045 }
3046 objects.insert(rot.rotation_id.clone(), rj);
3047 }
3048 }
3049 }
3050 for mid in &commit.manifests {
3051 if let Ok(manifest_data) = store.get_chunk(mid) {
3052 objects.insert(mid.clone(), manifest_data.clone());
3053 if let Ok(manifest) = metadata::deserialize::<FileManifest>(&manifest_data)
3054 {
3055 for cid in &manifest.chunks {
3056 if let Ok(chunk_data) = store.get_chunk(cid) {
3057 objects.insert(cid.clone(), chunk_data);
3058 }
3059 }
3060 }
3061 }
3062 }
3063 for parent in &commit.parents {
3064 stack.push(parent.clone());
3065 }
3066 }
3067 }
3068 }
3069
3070 if !json {
3071 info!(
3072 "Pushing {} objects ({} bytes)...",
3073 objects.len(),
3074 objects.values().map(|v| v.len() as u64).sum::<u64>()
3075 );
3076 }
3077
3078 let mut node = shard_net::p2p::Node::new().await?;
3080 let _ = node.listen("/ip4/0.0.0.0/tcp/0").await;
3081 let multiaddr: shard_net::libp2p::Multiaddr = peer.parse()?;
3082 let peer_id = match multiaddr.iter().last() {
3083 Some(shard_net::libp2p::multiaddr::Protocol::P2p(peer_id)) => peer_id,
3084 _ => anyhow::bail!("Multiaddr must end with /p2p/<peer_id>"),
3085 };
3086
3087 for (id, data) in &objects {
3088 node.request_put_chunk(&multiaddr, peer_id, id.clone(), data.clone())
3089 .await?;
3090 }
3091
3092 if json {
3093 info!(
3094 "{}",
3095 serde_json::to_string(&serde_json::json!({
3096 "status": "push complete",
3097 "objects": objects.len(),
3098 }))?
3099 );
3100 } else {
3101 info!("Push complete ({} objects).", objects.len());
3102 }
3103 Ok(())
3104}
3105
3106pub fn key_rotate(path: &Path) -> Result<()> {
3109 let shard_dir = path.join(".shard");
3110 if !shard_dir.exists() {
3111 anyhow::bail!("not a shard repository (run `shard init` first)");
3112 }
3113 let keys_dir = shard_dir.join("keys");
3114 let rotation = keychain::rotate_signing_key(&keys_dir)?;
3115
3116 if let Some(global) = global_keys_dir() {
3118 fs::create_dir_all(&global).ok();
3119 if let Ok(new_keys) = KeyPair::load(&keys_dir) {
3120 let _ = new_keys.save(&global);
3121 }
3122 }
3123
3124 let store = Store::open(&shard_dir)?;
3126 let json = serde_json::to_vec(&rotation)?;
3127 let hash = blake3::hash(&json);
3128 if !store.has_chunk(hash.to_hex().as_ref()) {
3129 store.put_chunk(&crate::chunker::Chunk {
3130 hash,
3131 data: json,
3132 offset: 0,
3133 })?;
3134 }
3135
3136 let rotations = keychain::load_rotations(&keys_dir)?;
3138 for rot in &rotations {
3139 let rj = serde_json::to_vec(rot)?;
3140 let rh = blake3::hash(&rj);
3141 if !store.has_chunk(rh.to_hex().as_ref()) {
3142 store.put_chunk(&crate::chunker::Chunk {
3143 hash: rh,
3144 data: rj,
3145 offset: 0,
3146 })?;
3147 }
3148 }
3149
3150 info!(
3151 "Key rotated: {} -> {}",
3152 rotation.old_key_id, rotation.new_key_id
3153 );
3154 info!("Rotation record: {} (stored in DAG)", rotation.rotation_id);
3155 Ok(())
3156}
3157
3158pub fn key_list(path: &Path, json: bool) -> Result<()> {
3160 let shard_dir = path.join(".shard");
3161 if !shard_dir.exists() {
3162 anyhow::bail!("not a shard repository (run `shard init` first)");
3163 }
3164 let keys_dir = shard_dir.join("keys");
3165 let records = keychain::load_records(&keys_dir)?;
3166 let current_id = keychain::get_current_key_id(&keys_dir)?;
3167
3168 if json {
3169 info!(
3170 "{}",
3171 serde_json::to_string_pretty(&serde_json::json!({
3172 "current": current_id,
3173 "records": &records,
3174 }))?
3175 );
3176 } else {
3177 info!("Current key: {}", current_id);
3178 info!("Key history:");
3179 for record in &records {
3180 let marker = if record.key_id == current_id {
3181 " (active)"
3182 } else {
3183 ""
3184 };
3185 info!(
3186 " {} created_at={}{}",
3187 record.key_id, record.created_at, marker
3188 );
3189 if let Some(prev) = &record.previous_key_id {
3190 info!(" previous: {}", prev);
3191 }
3192 }
3193 }
3194 Ok(())
3195}
3196
3197pub fn key_verify(path: &Path, json: bool) -> Result<()> {
3199 let shard_dir = path.join(".shard");
3200 if !shard_dir.exists() {
3201 anyhow::bail!("not a shard repository (run `shard init` first)");
3202 }
3203 let keys_dir = shard_dir.join("keys");
3204 let errors = keychain::verify_keychain(&keys_dir)?;
3205
3206 if json {
3207 info!(
3208 "{}",
3209 serde_json::to_string(&serde_json::json!({
3210 "verified": errors.is_empty(),
3211 "errors": errors,
3212 }))?
3213 );
3214 } else {
3215 if errors.is_empty() {
3216 info!("Keychain verification successful.");
3217 } else {
3218 for err in &errors {
3219 error!("Keychain error: {}", err);
3220 }
3221 anyhow::bail!("Keychain verification failed ({} errors).", errors.len());
3222 }
3223 }
3224 Ok(())
3225}