Skip to main content

shard_core/
lib.rs

1pub mod api;
2pub mod branch;
3pub mod chunker;
4pub mod commit;
5pub mod compression;
6pub mod config;
7pub mod encryption;
8pub mod gc;
9pub mod index;
10pub mod keychain;
11pub mod manifest;
12pub mod metadata;
13pub mod metrics;
14pub mod ops;
15pub mod partial;
16pub mod store;
17pub mod wal;
18
19use crate::commit::Commit;
20use crate::compression::Compression;
21use crate::index::Index;
22use crate::keychain::KeyRotation;
23use crate::manifest::FileManifest;
24use crate::store::Store;
25use anyhow::Result;
26use ed25519_dalek::{Signer, Verifier};
27use metadata::MetadataFormat;
28use serde::Serialize;
29use shard_crypto::KeyPair;
30use shard_net::libp2p::futures::StreamExt;
31use similar::TextDiff;
32use std::collections::HashMap;
33use std::fs;
34use std::io::Write;
35use std::path::Path;
36use std::path::PathBuf;
37use std::sync::Mutex;
38use std::time::{Duration, SystemTime, UNIX_EPOCH};
39use tracing::{error, info, warn};
40
41static PASSPHRASE_CACHE: Mutex<Option<String>> = Mutex::new(None);
42
43static OP_QUEUES: once_cell::sync::Lazy<ops::RepoOpQueues> =
44    once_cell::sync::Lazy::new(ops::RepoOpQueues::new);
45
46#[allow(dead_code)]
47fn with_write_op<T>(path: &Path, desc: &str, f: impl FnOnce() -> Result<T>) -> Result<T> {
48    let trace_id = ops::generate_trace_id();
49    ops::set_trace_id(&trace_id);
50    let guard = OP_QUEUES
51        .get_or_create(&path.to_path_buf())
52        .acquire(ops::OpKind::Write, desc.to_string());
53    OP_QUEUES
54        .get_or_create(&path.to_path_buf())
55        .wait_for_turn(&guard);
56    let result = f();
57    if let Err(ref e) = result {
58        crate::metrics::METRICS
59            .errors_total
60            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
61        ops::traced_warn(format!("op {} failed: {}", desc, e));
62    }
63    drop(guard);
64    ops::set_trace_id("");
65    result
66}
67
68#[allow(dead_code)]
69fn with_read_op<T>(path: &Path, desc: &str, f: impl FnOnce() -> Result<T>) -> Result<T> {
70    let trace_id = ops::generate_trace_id();
71    ops::set_trace_id(&trace_id);
72    let guard = OP_QUEUES
73        .get_or_create(&path.to_path_buf())
74        .acquire(ops::OpKind::Read, desc.to_string());
75    OP_QUEUES
76        .get_or_create(&path.to_path_buf())
77        .wait_for_turn(&guard);
78    let result = f();
79    drop(guard);
80    ops::set_trace_id("");
81    result
82}
83
84pub fn cache_passphrase(passphrase: &str) -> Result<()> {
85    let mut cache = PASSPHRASE_CACHE.lock().unwrap();
86    *cache = Some(passphrase.to_string());
87    Ok(())
88}
89
90fn get_cached_passphrase() -> Option<String> {
91    PASSPHRASE_CACHE.lock().unwrap().clone()
92}
93
94fn load_shardignore(path: &Path) -> Vec<String> {
95    let ignore_path = path.join(".shardignore");
96    if ignore_path.exists() {
97        fs::read_to_string(&ignore_path)
98            .unwrap_or_default()
99            .lines()
100            .filter(|l| !l.trim().is_empty() && !l.trim().starts_with('#'))
101            .map(|l| l.trim().to_string())
102            .collect()
103    } else {
104        Vec::new()
105    }
106}
107
108fn matches_ignore(path: &Path, patterns: &[String]) -> bool {
109    let path_str = path.to_string_lossy();
110    for pattern in patterns {
111        let pattern = pattern.trim_end_matches('/');
112        if pattern == "*" {
113            return true;
114        }
115        if let Some(glob) = pattern.strip_prefix("**/") {
116            if path_str.contains(glob) {
117                return true;
118            }
119        } else if path_str == pattern || path_str.ends_with(&format!("/{}", pattern)) {
120            return true;
121        }
122    }
123    false
124}
125
126pub fn init(
127    path: &Path,
128    backend: &str,
129    compression_algo: &str,
130    chunker_mode: &str,
131    chunk_size: Option<u64>,
132    is_private: bool,
133    json: bool,
134) -> Result<()> {
135    init_with_passphrase(
136        path,
137        backend,
138        compression_algo,
139        chunker_mode,
140        chunk_size,
141        is_private,
142        json,
143        "",
144    )
145}
146
147#[allow(clippy::too_many_arguments)]
148pub fn init_with_passphrase(
149    path: &Path,
150    backend: &str,
151    compression_algo: &str,
152    chunker_mode: &str,
153    chunk_size: Option<u64>,
154    is_private: bool,
155    json: bool,
156    passphrase: &str,
157) -> Result<()> {
158    crate::metrics::METRICS
159        .ops_init
160        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
161    let _guard = OP_QUEUES
162        .get_or_create(&path.to_path_buf())
163        .acquire(ops::OpKind::Write, "init".to_string());
164    let shard_dir = path.join(".shard");
165    if shard_dir.exists() {
166        anyhow::bail!(
167            "repository already initialized at {} (run `shard status` to confirm)",
168            shard_dir.display()
169        );
170    }
171    fs::create_dir_all(shard_dir.join("objects"))?;
172    fs::create_dir_all(shard_dir.join("keys"))?;
173    fs::create_dir_all(shard_dir.join("refs").join("heads"))?;
174    branch::set_head_branch(&shard_dir, "main")?;
175
176    let keys = KeyPair::generate();
177    keys.save_with_passphrase(&shard_dir.join("keys"), passphrase)?;
178    if !passphrase.is_empty() {
179        cache_passphrase(passphrase)?;
180    }
181    if let Some(global) = global_keys_dir() {
182        fs::create_dir_all(&global).ok();
183        let _ = keys.save_with_passphrase(&global, passphrase);
184        let _ = keychain::init_keychain(&global);
185    }
186    keychain::init_keychain(&shard_dir.join("keys"))?;
187
188    // Generate a deterministic repo identity from the public key
189    // (same key = same repo_id, so clones share the gossipsub topic)
190    let pubkey = fs::read(shard_dir.join("keys/public.key"))?;
191    let repo_id = blake3::hash(&pubkey).to_hex().to_string();
192    let mut config = load_config(&shard_dir)?;
193
194    if is_private {
195        let key = encryption::generate_repo_key();
196        encryption::save_repo_key(&shard_dir.join("keys"), &key)?;
197        config.insert("private".to_string(), "true".to_string());
198    }
199    config.insert("repo_id".to_string(), repo_id);
200    config.insert("storage_backend".to_string(), backend.to_string());
201    config.insert(
202        "serialization_format".to_string(),
203        MetadataFormat::Json.config_value().to_string(),
204    );
205    config.insert("compression".to_string(), compression_algo.to_string());
206    config.insert("chunker_mode".to_string(), chunker_mode.to_string());
207    match chunker_mode {
208        "rabin" => {
209            let chunk_size = chunk_size.unwrap_or(4_194_304);
210            let min = chunk_size / 4;
211            let max = chunk_size * 2;
212            config.insert("chunk_min".to_string(), min.to_string());
213            config.insert("chunk_avg".to_string(), chunk_size.to_string());
214            config.insert("chunk_max".to_string(), max.to_string());
215        }
216        _ => {
217            let cs = chunk_size.unwrap_or(4_194_304);
218            config.insert("chunk_size".to_string(), cs.to_string());
219        }
220    }
221    save_config(&shard_dir, &config)?;
222
223    let chunker_desc = if chunker_mode == "rabin" {
224        format!(
225            "rabin (avg {} bytes)",
226            config.get("chunk_avg").unwrap_or(&"4 MiB".to_string())
227        )
228    } else {
229        format!(
230            "fixed ({} bytes)",
231            config.get("chunk_size").unwrap_or(&"4 MiB".to_string())
232        )
233    };
234    if json {
235        info!(
236            "{}",
237            serde_json::to_string(&serde_json::json!({
238                "path": shard_dir.display().to_string(),
239                "backend": backend,
240                "compression": compression_algo,
241                "chunker": chunker_desc,
242                "private": is_private,
243            }))?
244        );
245    } else {
246        info!(
247            "Initialized empty Shard repository in {} with {} storage (compression: {}, chunking: {})",
248            shard_dir.display(),
249            backend,
250            compression_algo,
251            chunker_desc,
252        );
253    }
254    Ok(())
255}
256
257fn relative_path(repo_root: &Path, file_path: &Path) -> String {
258    let repo = repo_root
259        .canonicalize()
260        .unwrap_or_else(|_| repo_root.to_path_buf());
261    let file = file_path
262        .canonicalize()
263        .unwrap_or_else(|_| file_path.to_path_buf());
264    file.strip_prefix(&repo)
265        .map(|p| p.to_string_lossy().to_string())
266        .unwrap_or_else(|_| {
267            file_path
268                .file_name()
269                .map(|s| s.to_string_lossy().to_string())
270                .unwrap_or_default()
271        })
272}
273
274fn detect_content_type(file_path: &Path) -> Option<String> {
275    let ext = file_path.extension()?.to_str()?.to_lowercase();
276    let mime = match ext.as_str() {
277        "txt" => "text/plain",
278        "json" => "application/json",
279        "csv" => "text/csv",
280        "png" => "image/png",
281        "jpg" | "jpeg" => "image/jpeg",
282        "gif" => "image/gif",
283        "pdf" => "application/pdf",
284        "yaml" | "yml" => "application/x-yaml",
285        "md" => "text/markdown",
286        "html" | "htm" => "text/html",
287        "py" => "text/x-python",
288        "rs" => "text/x-rust",
289        "ts" => "text/x-typescript",
290        "js" => "application/javascript",
291        "wasm" => "application/wasm",
292        "toml" => "application/toml",
293        "xml" => "application/xml",
294        "zip" => "application/zip",
295        "tar" => "application/x-tar",
296        "gz" => "application/gzip",
297        "bin" => "application/octet-stream",
298        "pt" | "pth" | "ckpt" | "safetensors" => "application/x-model",
299        _ => return None,
300    };
301    Some(mime.to_string())
302}
303
304#[allow(clippy::too_many_arguments)]
305fn add_file(
306    repo_root: &Path,
307    file_path: &Path,
308    store: &Store,
309    index: &mut Index,
310    compression: &Compression,
311    chunker_mode: &chunker::ChunkerMode,
312    cipher: Option<&encryption::RepoCipher>,
313    _json: bool,
314) -> Result<()> {
315    let file = fs::File::open(file_path)?;
316    let mut chunker = match chunker_mode {
317        chunker::ChunkerMode::Fixed { chunk_size } => {
318            chunker::Chunker::new_fixed(Box::new(file), *chunk_size)
319        }
320        chunker::ChunkerMode::Rabin { min, avg, max } => {
321            chunker::Chunker::new_rabin(Box::new(file), *min, *avg, *max)
322        }
323    };
324    let mut chunk_hashes = Vec::new();
325    let mut total_size = 0;
326
327    while let Some(chunk) = chunker.next_chunk()? {
328        let hash = chunk.hash;
329        let compressed_data = compression.compress(&chunk.data)?;
330        let stored_data = match cipher {
331            Some(c) => c.encrypt(&compressed_data),
332            None => compressed_data,
333        };
334        let stored = crate::chunker::Chunk {
335            hash,
336            data: stored_data,
337            offset: chunk.offset,
338        };
339        store.put_chunk(&stored)?;
340        chunk_hashes.push(hash.to_hex().to_string());
341        total_size += chunk.data.len() as u64;
342    }
343
344    let name = relative_path(repo_root, file_path);
345    let manifest = FileManifest {
346        name: name.clone(),
347        size: total_size,
348        chunks: chunk_hashes.clone(),
349        content_type: detect_content_type(file_path),
350        compression: compression.as_str().to_string(),
351        merkle_root: Some(FileManifest::merkle_root(&chunk_hashes)),
352        created_by: None,
353        created_at: None,
354        signature: None,
355    };
356
357    index.files.insert(name.clone(), manifest);
358    if !_json {
359        info!("Added {} ({})", name, total_size);
360    }
361    Ok(())
362}
363
364pub fn recover(path: &Path, json: bool) -> Result<()> {
365    crate::metrics::METRICS
366        .ops_recover
367        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
368    let shard_dir = path.join(".shard");
369    if !shard_dir.exists() {
370        anyhow::bail!("not a shard repository (run `shard init` first)");
371    }
372    wal::recover(&shard_dir)?;
373    if json {
374        info!(
375            "{}",
376            serde_json::to_string(&serde_json::json!({"status": "recovery complete"}))?
377        );
378    } else {
379        info!("Recovery complete.");
380    }
381    Ok(())
382}
383
384pub fn health(path: &Path, json: bool) -> Result<()> {
385    use crate::metrics::METRICS;
386    METRICS
387        .ops_health
388        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
389
390    let shard_dir = path.join(".shard");
391    if !shard_dir.exists() {
392        anyhow::bail!("not a shard repository (run `shard init` first)");
393    }
394
395    let mut issues: Vec<String> = Vec::new();
396
397    let objects_ok = shard_dir.join("objects").exists();
398    if !objects_ok {
399        issues.push("objects directory missing".to_string());
400    }
401
402    let config_ok = shard_dir.join("config.json").exists();
403    let _config: Option<std::collections::BTreeMap<String, String>> = if config_ok {
404        match load_config(&shard_dir) {
405            Ok(c) => Some(c),
406            Err(e) => {
407                issues.push(format!("config.json corrupt: {}", e));
408                None
409            }
410        }
411    } else {
412        issues.push("config.json missing".to_string());
413        None
414    };
415
416    let keys_dir = shard_dir.join("keys");
417    let keys_ok = keys_dir.join("secret.key").exists() && keys_dir.join("public.key").exists();
418    if !keys_ok {
419        issues.push("keys missing (secret.key and/or public.key)".to_string());
420    }
421
422    let wal_has_entries = shard_dir.join("wal.log").exists() && {
423        std::fs::metadata(shard_dir.join("wal.log"))
424            .map(|m| m.len() > 0)
425            .unwrap_or(false)
426    };
427
428    let store_result = Store::open(&shard_dir);
429    let object_count = match &store_result {
430        Ok(store) => store.iter_chunks().map(|c| c.len() as u64).unwrap_or(0),
431        Err(e) => {
432            issues.push(format!("store error: {}", e));
433            0
434        }
435    };
436
437    let (branch, head) = branch::resolve_head(&shard_dir)
438        .ok()
439        .unwrap_or((None, None));
440
441    let branch_count = branch::list_branches(&shard_dir)
442        .map(|(_, b)| b.len() as u64)
443        .unwrap_or(0);
444
445    let tags_count = load_tags(&shard_dir).map(|t| t.len() as u64).unwrap_or(0);
446
447    let disk_usage = dir_size(&shard_dir);
448
449    let key_valid = load_keypair(&shard_dir).is_ok();
450
451    let metrics_snapshot = crate::metrics::METRICS.snapshot();
452
453    if json {
454        info!(
455            "{}",
456            serde_json::to_string(&serde_json::json!({
457                "status": if issues.is_empty() { "healthy" } else { "degraded" },
458                "shard_dir": shard_dir.display().to_string(),
459                "objects_ok": objects_ok,
460                "config_ok": config_ok,
461                "keys_ok": keys_ok && key_valid,
462                "wal_pending": wal_has_entries,
463                "object_count": object_count,
464                "current_branch": branch,
465                "head_commit": head,
466                "branch_count": branch_count,
467                "tag_count": tags_count,
468                "disk_usage_bytes": disk_usage,
469                "issues": issues,
470                "metrics": metrics_snapshot,
471            }))?
472        );
473    } else {
474        if issues.is_empty() {
475            info!("Health: OK");
476        } else {
477            info!("Health: DEGRADED");
478            for issue in &issues {
479                warn!("  - {}", issue);
480            }
481        }
482        info!("  Objects: {}", object_count);
483        info!("  Branches: {}, Tags: {}", branch_count, tags_count);
484        info!("  Current branch: {:?}", branch);
485        info!("  HEAD: {:?}", head);
486        info!("  WAL pending: {}", wal_has_entries);
487        info!("  Disk usage: {} bytes", disk_usage);
488    }
489
490    if !issues.is_empty() {
491        anyhow::bail!("health check found {} issue(s)", issues.len());
492    }
493    Ok(())
494}
495
496fn dir_size(path: &Path) -> u64 {
497    walkdir::WalkDir::new(path)
498        .into_iter()
499        .filter_map(|e| e.ok())
500        .filter(|e| e.file_type().is_file())
501        .filter_map(|e| e.metadata().ok())
502        .map(|m| m.len())
503        .sum()
504}
505
506pub fn get_metrics() -> metrics::MetricsSnapshot {
507    crate::metrics::METRICS.snapshot()
508}
509
510pub fn add(path: &Path, file_path: &Path, json: bool) -> Result<()> {
511    crate::metrics::METRICS
512        .ops_add
513        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
514    let _guard = OP_QUEUES
515        .get_or_create(&path.to_path_buf())
516        .acquire(ops::OpKind::Write, "add".to_string());
517    let shard_dir = path.join(".shard");
518    if !shard_dir.exists() {
519        anyhow::bail!("not a shard repository (run `shard init` first)");
520    }
521
522    wal::recover(&shard_dir)?;
523
524    let config = load_config(&shard_dir)?;
525    let compression: Compression = config
526        .get("compression")
527        .map(|s| s.as_str())
528        .unwrap_or("zstd")
529        .parse()?;
530
531    let chunker_mode = chunker::ChunkerMode::from_config(&config);
532    let fmt = MetadataFormat::from_config(&config);
533
534    let store = Store::open(&shard_dir)?;
535    let mut index = Index::load(&shard_dir.join("index"), &fmt)?;
536
537    let cipher = maybe_load_cipher(&shard_dir)?;
538    let ignore_patterns = load_shardignore(path);
539
540    if file_path.is_dir() {
541        for entry in walkdir::WalkDir::new(file_path)
542            .into_iter()
543            .filter_entry(|e| {
544                let name = e.file_name().to_string_lossy();
545                if name == ".shard" || name == ".git" {
546                    return false;
547                }
548                let rel = e.path().strip_prefix(file_path).unwrap_or(e.path());
549                if matches_ignore(rel, &ignore_patterns) {
550                    return false;
551                }
552                true
553            })
554        {
555            let entry = entry?;
556            if entry.file_type().is_file() {
557                add_file(
558                    path,
559                    entry.path(),
560                    &store,
561                    &mut index,
562                    &compression,
563                    &chunker_mode,
564                    cipher.as_ref(),
565                    json,
566                )?;
567            }
568        }
569    } else {
570        add_file(
571            path,
572            file_path,
573            &store,
574            &mut index,
575            &compression,
576            &chunker_mode,
577            cipher.as_ref(),
578            json,
579        )?;
580    }
581
582    index.save(&shard_dir.join("index"), &fmt)?;
583    if json {
584        info!(
585            "{}",
586            serde_json::to_string(&serde_json::json!({"status": "added"}))?
587        );
588    }
589    Ok(())
590}
591
592pub fn commit(path: &Path, message: &str, author: &str, json: bool) -> Result<()> {
593    let _guard = OP_QUEUES
594        .get_or_create(&path.to_path_buf())
595        .acquire(ops::OpKind::Write, "commit".to_string());
596    let shard_dir = path.join(".shard");
597    if !shard_dir.exists() {
598        anyhow::bail!("not a shard repository (run `shard init` first)");
599    }
600
601    // Recover from any previous crash before mutating
602    wal::recover(&shard_dir)?;
603
604    let config = load_config(&shard_dir)?;
605    let fmt = MetadataFormat::from_config(&config);
606
607    let store = Store::open(&shard_dir)?;
608    let mut index = Index::load(&shard_dir.join("index"), &fmt)?;
609
610    if index.files.is_empty() {
611        anyhow::bail!("nothing to commit (stage files with `shard add` first)");
612    }
613
614    let head_path = shard_dir.join("HEAD");
615
616    // WAL: back up pre-commit state
617    let wal = wal::Wal::new(&shard_dir);
618    let head_backup = fs::read_to_string(&head_path).ok();
619    let index_backup = fs::read(shard_dir.join("index"))?;
620    wal.append(&wal::WalEntry::CommitBegin {
621        head_backup,
622        index_backup,
623    })?;
624
625    // 1. Store manifests (signed)
626    let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
627    let keys = load_keypair(&shard_dir)?;
628    let signing_key = keys.signing_key;
629    let mut manifest_ids = Vec::new();
630    for manifest in index.files.values_mut() {
631        manifest.created_by = Some(author.to_string());
632        manifest.created_at = Some(timestamp);
633
634        let mut unsigned = manifest.clone();
635        unsigned.signature = None;
636        let canonical = metadata::serialize_for_signing(&unsigned);
637        let sig = signing_key.sign(&canonical);
638        manifest.signature = Some(hex::encode(sig.to_bytes()));
639
640        let encoded = metadata::serialize(manifest, &fmt);
641        let hash = blake3::hash(&encoded);
642        let chunk = crate::chunker::Chunk {
643            hash,
644            data: encoded,
645            offset: 0,
646        };
647        store.put_chunk(&chunk)?;
648        manifest_ids.push(hash.to_hex().to_string());
649    }
650    manifest_ids.sort();
651
652    // 2. Get parent
653    let mut parents = Vec::new();
654    let (current_branch, parent_id) = branch::resolve_head(&shard_dir)?;
655    if let Some(pid) = parent_id {
656        parents.push(pid);
657    }
658
659    // 3. Create commit
660    let public_key_hex = hex::encode(keys.verifying_key.to_bytes());
661    let key_id = keychain::get_current_key_id(&shard_dir.join("keys")).ok();
662    let mut commit = Commit {
663        commit_id: String::new(),
664        parents,
665        manifests: manifest_ids,
666        author: author.to_string(),
667        message: message.to_string(),
668        timestamp,
669        public_key: Some(public_key_hex),
670        signature: None,
671        key_id,
672    };
673
674    // 4. Sign — always JSON for deterministic signature
675    let json_unsigned = metadata::serialize_for_signing(&commit);
676    let signature = signing_key.sign(&json_unsigned);
677    commit.signature = Some(hex::encode(signature.to_bytes()));
678
679    // 5. Store commit — use configured format
680    let encoded = metadata::serialize(&commit, &fmt);
681    let hash = blake3::hash(&encoded);
682    let chunk = crate::chunker::Chunk {
683        hash,
684        data: encoded,
685        offset: 0,
686    };
687    store.put_chunk(&chunk)?;
688
689    // 6. Cycle detection: verify no parent chain already contains this commit
690    let commit_id = hash.to_hex().to_string();
691    if has_dag_cycle(&store, &commit.parents, &commit_id)? {
692        anyhow::bail!(
693            "Cycle detected: commit {} is already an ancestor of one or more parents",
694            commit_id
695        );
696    }
697
698    // 7. Update HEAD and branch ref
699    if let Some(ref branch_name) = current_branch {
700        branch::update_branch_ref(&shard_dir, branch_name, &commit_id)?;
701        branch::set_head_branch(&shard_dir, branch_name)?;
702    } else {
703        fs::write(&head_path, &commit_id)?;
704    }
705
706    // 7. Clear index
707    index.files.clear();
708    index.save(&shard_dir.join("index"), &fmt)?;
709
710    // WAL: mark commit complete
711    wal.append(&wal::WalEntry::CommitEnd)?;
712    wal.truncate()?;
713
714    if json {
715        info!(
716            "{}",
717            serde_json::to_string(&serde_json::json!({
718                "commit_id": commit_id,
719                "message": message,
720            }))?
721        );
722    } else {
723        info!("Committed {} ({})", commit_id, message);
724    }
725    Ok(())
726}
727
728pub fn verify(path: &Path, commit_id: &str, json: bool) -> Result<()> {
729    let _guard = OP_QUEUES
730        .get_or_create(&path.to_path_buf())
731        .acquire(ops::OpKind::Read, "verify".to_string());
732    let shard_dir = path.join(".shard");
733    if !shard_dir.exists() {
734        anyhow::bail!("not a shard repository (run `shard init` first)");
735    }
736
737    if commit_id.len() < 2 {
738        anyhow::bail!("invalid commit id (too short: need at least 2 characters)");
739    }
740    let store = Store::open(&shard_dir)?;
741    let cipher = maybe_load_cipher(&shard_dir)?;
742    let commit_data = store.get_chunk(commit_id)?;
743
744    // Self-verify: stored blob hash must equal commit_id (M5)
745    let stored_hash = blake3::hash(&commit_data);
746    if stored_hash.to_hex().to_string() != commit_id {
747        anyhow::bail!("commit object hash mismatch: stored content does not match its hash — data may be corrupted");
748    }
749
750    let commit: Commit = metadata::deserialize(&commit_data)?;
751
752    let mut sig_verified = false;
753    let mut files_checked = 0u64;
754
755    // Verify the signing key was valid at commit time
756    if let Some(kid) = &commit.key_id {
757        if let Err(e) = keychain::key_was_valid_at(&shard_dir.join("keys"), kid, commit.timestamp) {
758            anyhow::bail!("Keychain verification failed: {}", e);
759        } else if !json {
760            info!("Keychain: key {} was active at commit time.", kid);
761        }
762    }
763
764    if let Some(sig_hex) = &commit.signature {
765        let verifying_key = if let Some(pk_hex) = &commit.public_key {
766            let pk_bytes = hex::decode(pk_hex)?;
767            ed25519_dalek::VerifyingKey::from_bytes(pk_bytes.as_slice().try_into()?)?
768        } else {
769            let pub_bytes = load_public_key(&shard_dir)?;
770            ed25519_dalek::VerifyingKey::from_bytes(pub_bytes.as_slice().try_into()?)?
771        };
772
773        let mut unsigned_commit = commit.clone();
774        unsigned_commit.signature = None;
775        let json_unsigned = metadata::serialize_for_signing(&unsigned_commit);
776
777        let sig_bytes = hex::decode(sig_hex)?;
778        let signature = ed25519_dalek::Signature::from_bytes(sig_bytes.as_slice().try_into()?);
779
780        verifying_key.verify(&json_unsigned, &signature)?;
781        sig_verified = true;
782        if !json {
783            info!("Signature verified.");
784        }
785    } else if !json {
786        info!("Warning: Commit is unsigned.");
787    }
788
789    for manifest_id in &commit.manifests {
790        let manifest_data = store.get_chunk(manifest_id)?;
791        let hash = blake3::hash(&manifest_data);
792        if hash.to_hex().to_string() != *manifest_id {
793            anyhow::bail!("manifest object hash mismatch for manifest '{}': content does not match stored hash. The object store may be corrupted.", manifest_id);
794        }
795
796        let manifest: FileManifest = metadata::deserialize(&manifest_data)?;
797
798        // Verify manifest signature (defense-in-depth; commit signature already covers manifest_id)
799        if let Some(sig_hex) = &manifest.signature {
800            let pk_bytes = if let Some(pk_hex) = &commit.public_key {
801                hex::decode(pk_hex)?
802            } else {
803                load_public_key(&shard_dir)?
804            };
805            let vk = ed25519_dalek::VerifyingKey::from_bytes(pk_bytes.as_slice().try_into()?)?;
806            let mut unsigned = manifest.clone();
807            unsigned.signature = None;
808            let canonical = metadata::serialize_for_signing(&unsigned);
809            let sig_bytes = hex::decode(sig_hex)?;
810            let sig = ed25519_dalek::Signature::from_bytes(sig_bytes.as_slice().try_into()?);
811            vk.verify(&canonical, &sig)?;
812            if !json {
813                info!("  Manifest signature verified for: {}", manifest.name);
814            }
815        }
816
817        let compression = manifest.compression.parse::<Compression>()?;
818        if !json {
819            info!(
820                "Verifying file: {} (compression: {})",
821                manifest.name, manifest.compression
822            );
823        }
824
825        // Verify merkle_root if present
826        if let Some(ref mr) = manifest.merkle_root {
827            let computed = FileManifest::merkle_root(&manifest.chunks);
828            if mr != &computed {
829                anyhow::bail!(
830                    "merkle root mismatch for '{}': manifest says {} but computed {}",
831                    manifest.name,
832                    mr,
833                    computed
834                );
835            }
836        }
837
838        for chunk_id in &manifest.chunks {
839            let chunk_data = store.get_chunk(chunk_id)?;
840            let decrypted = match &cipher {
841                Some(c) => c.decrypt(&chunk_data)?,
842                None => chunk_data,
843            };
844            let decompressed = compression.decompress(&decrypted)?;
845            let hash = blake3::hash(&decompressed);
846            if hash.to_hex().to_string() != *chunk_id {
847                anyhow::bail!("chunk hash mismatch for '{}': content does not match stored hash (expected {}, got {}). File may be corrupted.", manifest.name, chunk_id, hash.to_hex());
848            }
849        }
850        files_checked += 1;
851    }
852
853    if json {
854        info!(
855            "{}",
856            serde_json::to_string(&serde_json::json!({
857                "commit_id": commit_id,
858                "verified": true,
859                "signature_verified": sig_verified,
860                "files_checked": files_checked,
861            }))?
862        );
863    } else {
864        info!("Verification successful.");
865    }
866    Ok(())
867}
868
869fn load_commit(store: &Store, commit_id: &str) -> Result<Commit> {
870    if commit_id.len() < 2 {
871        anyhow::bail!(
872            "commit id too short (got {} chars, need at least 2): '{}'",
873            commit_id.len(),
874            commit_id
875        );
876    };
877    let data = store.get_chunk(commit_id)?;
878    let mut commit: Commit = metadata::deserialize(&data)?;
879    commit.commit_id = commit_id.to_string();
880    Ok(commit)
881}
882
883fn has_dag_cycle(store: &Store, parents: &[String], commit_id: &str) -> Result<bool> {
884    let mut seen = std::collections::HashSet::new();
885    let mut stack: Vec<String> = parents.to_vec();
886    while let Some(cid) = stack.pop() {
887        if cid == commit_id {
888            return Ok(true);
889        }
890        if !seen.insert(cid.clone()) {
891            continue;
892        }
893        if let Ok(data) = store.get_chunk(&cid) {
894            if let Ok(commit) = metadata::deserialize::<Commit>(&data) {
895                for p in &commit.parents {
896                    stack.push(p.clone());
897                }
898            }
899        }
900    }
901    Ok(false)
902}
903
904#[derive(Serialize)]
905pub struct LogEntry {
906    pub commit_id: String,
907    pub parents: Vec<String>,
908    pub manifests: Vec<String>,
909    pub author: String,
910    pub message: String,
911    pub timestamp: u64,
912    pub signature: Option<String>,
913}
914
915impl From<Commit> for LogEntry {
916    fn from(c: Commit) -> Self {
917        LogEntry {
918            commit_id: c.commit_id,
919            parents: c.parents,
920            manifests: c.manifests,
921            author: c.author,
922            message: c.message,
923            timestamp: c.timestamp,
924            signature: c.signature,
925        }
926    }
927}
928
929pub fn log_cmd(path: &Path, json: bool) -> Result<()> {
930    let shard_dir = path.join(".shard");
931    if !shard_dir.exists() {
932        anyhow::bail!("not a shard repository (run `shard init` first)");
933    }
934
935    let store = Store::open(&shard_dir)?;
936
937    let (_, head_commit) = branch::resolve_head(&shard_dir)?;
938    let head = head_commit
939        .ok_or_else(|| anyhow::anyhow!("no commits yet (run `shard commit` after adding files)"))?;
940
941    let mut entries: Vec<LogEntry> = Vec::new();
942    let mut seen = std::collections::HashSet::new();
943    let mut stack = vec![head];
944
945    while let Some(cid) = stack.pop() {
946        if !seen.insert(cid.clone()) {
947            continue;
948        }
949        let commit = load_commit(&store, &cid)?;
950        for parent in &commit.parents {
951            stack.push(parent.clone());
952        }
953        entries.push(commit.into());
954    }
955
956    if json {
957        info!("{}", serde_json::to_string_pretty(&entries)?);
958    } else {
959        for entry in &entries {
960            let ts = {
961                let secs = entry.timestamp as i64;
962                let tm = time::OffsetDateTime::from_unix_timestamp(secs)
963                    .unwrap_or(time::OffsetDateTime::UNIX_EPOCH);
964                tm.format(&time::format_description::well_known::Rfc3339)
965                    .unwrap_or_else(|_| entry.timestamp.to_string())
966            };
967            info!("commit {}", entry.commit_id);
968            if !entry.parents.is_empty() {
969                info!("parents: {}", entry.parents.join(" "));
970            }
971            info!("author: {}", entry.author);
972            info!("date:   {}", ts);
973            info!("");
974            for line in entry.message.lines() {
975                info!("    {}", line);
976            }
977            info!("");
978        }
979    }
980
981    Ok(())
982}
983
984fn reconstruct_file(
985    store: &Store,
986    manifest: &FileManifest,
987    cipher: Option<&encryption::RepoCipher>,
988) -> Result<Vec<u8>> {
989    let compression: Compression = manifest.compression.parse()?;
990    let mut data = Vec::new();
991    for chunk_id in &manifest.chunks {
992        let chunk_data = store.get_chunk(chunk_id)?;
993        let decrypted = match cipher {
994            Some(c) => c.decrypt(&chunk_data)?,
995            None => chunk_data,
996        };
997        let decompressed = compression.decompress(&decrypted)?;
998        data.extend_from_slice(&decompressed);
999    }
1000    Ok(data)
1001}
1002
1003pub fn diff(path: &Path, commit_a: &str, commit_b: Option<&str>, json: bool) -> Result<()> {
1004    let shard_dir = path.join(".shard");
1005    if !shard_dir.exists() {
1006        anyhow::bail!("not a shard repository (run `shard init` first)");
1007    }
1008
1009    let store = Store::open(&shard_dir)?;
1010    let cipher = maybe_load_cipher(&shard_dir)?;
1011
1012    let cid_b = match commit_b {
1013        Some(c) => branch::resolve_rev(&shard_dir, c)?,
1014        None => {
1015            let (_, head) = branch::resolve_head(&shard_dir)?;
1016            head.ok_or_else(|| anyhow::anyhow!("no commits yet"))?
1017        }
1018    };
1019    let cid_a = branch::resolve_rev(&shard_dir, commit_a)?;
1020
1021    let commit1 = load_commit(&store, &cid_a)?;
1022    let commit2 = load_commit(&store, &cid_b)?;
1023
1024    let mut files1: HashMap<String, FileManifest> = HashMap::new();
1025    for mid in &commit1.manifests {
1026        let data = store.get_chunk(mid)?;
1027        let m: FileManifest = metadata::deserialize(&data)?;
1028        files1.insert(m.name.clone(), m);
1029    }
1030
1031    let mut files2: HashMap<String, FileManifest> = HashMap::new();
1032    for mid in &commit2.manifests {
1033        let data = store.get_chunk(mid)?;
1034        let m: FileManifest = metadata::deserialize(&data)?;
1035        files2.insert(m.name.clone(), m);
1036    }
1037
1038    let mut all_names: Vec<&String> = files1.keys().chain(files2.keys()).collect();
1039    all_names.sort();
1040    all_names.dedup();
1041
1042    let mut changes: Vec<serde_json::Value> = Vec::new();
1043    let mut diff_found = false;
1044
1045    for name in all_names {
1046        match (files1.get(name), files2.get(name)) {
1047            (None, Some(manifest)) => {
1048                let content = reconstruct_file(&store, manifest, cipher.as_ref())?;
1049                let text = String::from_utf8_lossy(&content);
1050                diff_found = true;
1051                if json {
1052                    changes.push(serde_json::json!({
1053                        "type": "added",
1054                        "file": name,
1055                        "lines": text.lines().collect::<Vec<_>>(),
1056                    }));
1057                } else {
1058                    info!("--- /dev/null");
1059                    info!("+++ b/{}", name);
1060                    let lines: Vec<&str> = text.lines().collect();
1061                    info!("@@ -0,0 +1,{} @@", lines.len());
1062                    for line in &lines {
1063                        info!("+{}", line);
1064                    }
1065                }
1066            }
1067            (Some(manifest), None) => {
1068                let content = reconstruct_file(&store, manifest, cipher.as_ref())?;
1069                let text = String::from_utf8_lossy(&content);
1070                diff_found = true;
1071                if json {
1072                    changes.push(serde_json::json!({
1073                        "type": "removed",
1074                        "file": name,
1075                        "lines": text.lines().collect::<Vec<_>>(),
1076                    }));
1077                } else {
1078                    info!("--- a/{}", name);
1079                    info!("+++ /dev/null");
1080                    let lines: Vec<&str> = text.lines().collect();
1081                    info!("@@ -1,{} +0,0 @@", lines.len());
1082                    for line in &lines {
1083                        info!("-{}", line);
1084                    }
1085                }
1086            }
1087            (Some(ma), Some(mb)) => {
1088                if ma.chunks == mb.chunks {
1089                    continue;
1090                }
1091                let content_a = reconstruct_file(&store, ma, cipher.as_ref())?;
1092                let content_b = reconstruct_file(&store, mb, cipher.as_ref())?;
1093                diff_found = true;
1094                if json {
1095                    let text_a = String::from_utf8_lossy(&content_a);
1096                    let text_b = String::from_utf8_lossy(&content_b);
1097                    changes.push(serde_json::json!({
1098                        "type": "modified",
1099                        "file": name,
1100                        "old_lines": text_a.lines().collect::<Vec<_>>(),
1101                        "new_lines": text_b.lines().collect::<Vec<_>>(),
1102                    }));
1103                } else {
1104                    let text_a = String::from_utf8_lossy(&content_a);
1105                    let text_b = String::from_utf8_lossy(&content_b);
1106                    let diff = TextDiff::from_lines(text_a.as_ref(), text_b.as_ref());
1107                    let mut buf: Vec<u8> = Vec::new();
1108                    diff.unified_diff()
1109                        .header(&format!("a/{}", name), &format!("b/{}", name))
1110                        .to_writer(&mut buf)
1111                        .map_err(|e| anyhow::anyhow!("diff output error: {}", e))?;
1112                    let output = String::from_utf8_lossy(&buf);
1113                    for line in output.lines() {
1114                        info!("{}", line);
1115                    }
1116                }
1117            }
1118            (None, None) => {}
1119        }
1120    }
1121
1122    if !diff_found {
1123        if json {
1124            info!(
1125                "{}",
1126                serde_json::to_string(
1127                    &serde_json::json!({"changes": changes, "message": "no differences"})
1128                )?
1129            );
1130        } else {
1131            info!("No differences between the commits.");
1132        }
1133        return Ok(());
1134    }
1135
1136    if json {
1137        info!(
1138            "{}",
1139            serde_json::to_string(&serde_json::json!({"changes": changes}))?
1140        );
1141    }
1142
1143    Ok(())
1144}
1145
1146pub fn checkout(path: &Path, target: &str, json: bool) -> Result<()> {
1147    let _guard = OP_QUEUES
1148        .get_or_create(&path.to_path_buf())
1149        .acquire(ops::OpKind::Write, "checkout".to_string());
1150    let shard_dir = path.join(".shard");
1151    if !shard_dir.exists() {
1152        anyhow::bail!("not a shard repository (run `shard init` first)");
1153    }
1154
1155    let store = Store::open(&shard_dir)?;
1156    let cipher = maybe_load_cipher(&shard_dir)?;
1157
1158    // Resolve target: branch name or commit id
1159    // Validate BEFORE writing to HEAD to avoid corruption on failure
1160    let branch_path = shard_dir.join("refs").join("heads").join(target);
1161    let commit_id = if branch_path.exists() {
1162        fs::read_to_string(&branch_path)?.trim().to_string()
1163    } else {
1164        target.to_string()
1165    };
1166
1167    // Validate commit exists before touching HEAD
1168    let commit = load_commit(&store, &commit_id)?;
1169
1170    // Only update HEAD after validation succeeds
1171    if branch_path.exists() {
1172        branch::set_head_branch(&shard_dir, target)?;
1173    } else {
1174        branch::set_head_commit(&shard_dir, target)?;
1175    }
1176    let mut files = Vec::new();
1177
1178    for manifest_id in &commit.manifests {
1179        let data = store.get_chunk(manifest_id)?;
1180        let hash = blake3::hash(&data);
1181        if hash.to_hex().to_string() != *manifest_id {
1182            anyhow::bail!("Manifest hash mismatch: {}", manifest_id);
1183        }
1184        let manifest: FileManifest = metadata::deserialize(&data)?;
1185        let compression = manifest.compression.parse::<Compression>()?;
1186        if !json {
1187            info!(
1188                "Checking out file: {} (compression: {})",
1189                manifest.name, manifest.compression
1190            );
1191        }
1192
1193        let mut file_data = Vec::new();
1194        for chunk_id in &manifest.chunks {
1195            let chunk_data = store.get_chunk(chunk_id)?;
1196            let decrypted = match &cipher {
1197                Some(c) => c.decrypt(&chunk_data)?,
1198                None => chunk_data,
1199            };
1200            let decompressed = compression.decompress(&decrypted)?;
1201            file_data.extend_from_slice(&decompressed);
1202        }
1203        fs::write(path.join(&manifest.name), file_data)?;
1204        if !json {
1205            info!("  -> {}", manifest.name);
1206        }
1207        files.push(manifest.name);
1208    }
1209
1210    if json {
1211        info!(
1212            "{}",
1213            serde_json::to_string(&serde_json::json!({
1214                "commit_id": commit_id,
1215                "files": files,
1216            }))?
1217        );
1218    } else {
1219        info!("Checkout complete.");
1220    }
1221    Ok(())
1222}
1223
1224pub fn status(path: &Path, json: bool) -> Result<()> {
1225    let shard_dir = path.join(".shard");
1226    if !shard_dir.exists() {
1227        anyhow::bail!("not a shard repository (run `shard init` first)");
1228    }
1229
1230    let config = load_config(&shard_dir)?;
1231    let fmt = MetadataFormat::from_config(&config);
1232
1233    let (current_branch, head_commit) = branch::resolve_head(&shard_dir)?;
1234    let mut commit_id: Option<String> = None;
1235    if let Some(cid) = head_commit {
1236        commit_id = Some(cid);
1237        if !json {
1238            if let Some(ref branch) = current_branch {
1239                info!("On branch: {}", branch);
1240            } else {
1241                info!("HEAD detached at {}", commit_id.as_ref().unwrap());
1242            }
1243        }
1244    } else if !json {
1245        info!("No commits yet.");
1246    }
1247
1248    let index = Index::load(&shard_dir.join("index"), &fmt)?;
1249    let staged: Vec<String> = index.files.keys().cloned().collect();
1250    if !json {
1251        if staged.is_empty() {
1252            info!("Nothing staged.");
1253        } else {
1254            info!("\nStaged files:");
1255            for name in &staged {
1256                info!("  {} (to be committed)", name);
1257            }
1258        }
1259    }
1260
1261    let store = Store::open(&shard_dir)?;
1262    let mut deleted = Vec::new();
1263    let tracked_names: std::collections::HashSet<String> = if let Some(head) = &commit_id {
1264        let mut names = std::collections::HashSet::new();
1265        if let Ok(commit) = load_commit(&store, head) {
1266            for manifest_id in &commit.manifests {
1267                if let Ok(data) = store.get_chunk(manifest_id) {
1268                    if let Ok(manifest) = metadata::deserialize::<FileManifest>(&data) {
1269                        let file_path = path.join(&manifest.name);
1270                        if !file_path.exists() {
1271                            deleted.push(manifest.name.clone());
1272                        }
1273                        names.insert(manifest.name);
1274                    }
1275                }
1276            }
1277        }
1278        names
1279    } else {
1280        std::collections::HashSet::new()
1281    };
1282
1283    if !json && !deleted.is_empty() {
1284        info!("\nDeleted files:");
1285        for name in &deleted {
1286            info!("  {} (deleted)", name);
1287        }
1288    }
1289
1290    let mut untracked = Vec::new();
1291    let ignore_patterns = load_shardignore(path);
1292    for entry in walkdir::WalkDir::new(path)
1293        .min_depth(1)
1294        .into_iter()
1295        .filter_map(|e| e.ok())
1296    {
1297        if entry.file_type().is_file() || entry.file_type().is_dir() {
1298            let rel_path = entry.path().strip_prefix(path).unwrap_or(entry.path());
1299            let name = rel_path.to_string_lossy().to_string();
1300            let is_hidden = rel_path.components().any(|c| {
1301                let name = c.as_os_str().to_string_lossy();
1302                name == ".shard" || name == ".git"
1303            });
1304            if !is_hidden
1305                && entry.path() != shard_dir
1306                && !entry.path().starts_with(&shard_dir)
1307                && !index.files.contains_key(&name)
1308                && !tracked_names.contains(&name)
1309                && entry.file_type().is_file()
1310                && !matches_ignore(rel_path, &ignore_patterns)
1311            {
1312                untracked.push(name);
1313            }
1314        }
1315    }
1316    if !json && !untracked.is_empty() {
1317        info!("\nUntracked files:");
1318        for name in &untracked {
1319            info!("  {}", name);
1320        }
1321    }
1322
1323    if json {
1324        info!(
1325            "{}",
1326            serde_json::to_string(&serde_json::json!({
1327                "commit": commit_id,
1328                "staged": staged,
1329                "deleted": deleted,
1330                "untracked": untracked,
1331            }))?
1332        );
1333    }
1334
1335    Ok(())
1336}
1337
1338fn load_config(shard_dir: &Path) -> Result<std::collections::BTreeMap<String, String>> {
1339    let config_path = shard_dir.join("config.json");
1340    let mut config: std::collections::BTreeMap<String, String> = if config_path.exists() {
1341        let data = fs::read(&config_path)?;
1342        metadata::deserialize(&data)?
1343    } else {
1344        std::collections::BTreeMap::new()
1345    };
1346    let overrides = config::load_env_overrides();
1347    for (k, v) in overrides {
1348        config.insert(k, v);
1349    }
1350    Ok(config)
1351}
1352
1353fn save_config(
1354    shard_dir: &Path,
1355    config: &std::collections::BTreeMap<String, String>,
1356) -> Result<()> {
1357    let fmt = MetadataFormat::from_config(config);
1358    let data = metadata::serialize(config, &fmt);
1359    fs::write(shard_dir.join("config.json"), data)?;
1360    Ok(())
1361}
1362
1363fn global_keys_dir() -> Option<PathBuf> {
1364    std::env::var("HOME")
1365        .ok()
1366        .map(|h| Path::new(&h).join(".shard").join("keys"))
1367}
1368
1369fn load_keypair(shard_dir: &Path) -> Result<KeyPair> {
1370    let local = shard_dir.join("keys");
1371    if local.join("secret.key").exists() {
1372        match KeyPair::load(&local) {
1373            Ok(kp) => return Ok(kp),
1374            Err(e) => {
1375                if e.to_string().contains("encrypted") {
1376                    if let Some(passphrase) = get_cached_passphrase() {
1377                        return KeyPair::load_with_passphrase(&local, &passphrase);
1378                    }
1379                    anyhow::bail!(
1380                        "secret.key is encrypted. Use `shard unlock --passphrase <pass>` or provide --passphrase"
1381                    );
1382                }
1383                return Err(e);
1384            }
1385        }
1386    }
1387    if let Some(global) = global_keys_dir() {
1388        if global.join("secret.key").exists() {
1389            match KeyPair::load(&global) {
1390                Ok(kp) => return Ok(kp),
1391                Err(e) => {
1392                    if e.to_string().contains("encrypted") {
1393                        if let Some(passphrase) = get_cached_passphrase() {
1394                            return KeyPair::load_with_passphrase(&global, &passphrase);
1395                        }
1396                    }
1397                    return Err(e);
1398                }
1399            }
1400        }
1401    }
1402    anyhow::bail!("no keypair found in {} or ~/.shard/keys/", local.display())
1403}
1404
1405fn load_public_key(shard_dir: &Path) -> Result<Vec<u8>> {
1406    let local = shard_dir.join("keys/public.key");
1407    if local.exists() {
1408        return Ok(fs::read(&local)?);
1409    }
1410    if let Some(global) = global_keys_dir() {
1411        let gp = global.join("public.key");
1412        if gp.exists() {
1413            return Ok(fs::read(&gp)?);
1414        }
1415    }
1416    anyhow::bail!(
1417        "no public key found in {} or ~/.shard/keys/",
1418        local.display()
1419    )
1420}
1421
1422fn maybe_load_cipher(shard_dir: &Path) -> Result<Option<encryption::RepoCipher>> {
1423    let config = load_config(shard_dir)?;
1424    if config.get("private").map(|s| s.as_str()) == Some("true") {
1425        let key = encryption::load_repo_key(&shard_dir.join("keys"))?;
1426        Ok(Some(encryption::RepoCipher::from_key(&key)))
1427    } else {
1428        Ok(None)
1429    }
1430}
1431
1432pub fn config_get(path: &Path, key: Option<&str>) -> Result<()> {
1433    let shard_dir = path.join(".shard");
1434    if !shard_dir.exists() {
1435        anyhow::bail!("not a shard repository (run `shard init` first)");
1436    }
1437    let config = load_config(&shard_dir)?;
1438    if let Some(key) = key {
1439        match config.get(key) {
1440            Some(value) => info!("{} = {}", key, value),
1441            None => anyhow::bail!("config key not found: {}", key),
1442        }
1443    } else {
1444        for (k, v) in &config {
1445            info!("{} = {}", k, v);
1446        }
1447    }
1448    Ok(())
1449}
1450
1451pub fn config_set(path: &Path, key: &str, value: &str) -> Result<()> {
1452    let shard_dir = path.join(".shard");
1453    if !shard_dir.exists() {
1454        anyhow::bail!("not a shard repository (run `shard init` first)");
1455    }
1456    let mut config = load_config(&shard_dir)?;
1457    config.insert(key.to_string(), value.to_string());
1458    save_config(&shard_dir, &config)?;
1459    info!("{} = {}", key, value);
1460    Ok(())
1461}
1462
1463pub(crate) fn load_tags(shard_dir: &Path) -> Result<std::collections::BTreeMap<String, String>> {
1464    let tags_path = shard_dir.join("tags.json");
1465    if tags_path.exists() {
1466        let data = fs::read(&tags_path)?;
1467        Ok(serde_json::from_slice(&data)?)
1468    } else {
1469        Ok(std::collections::BTreeMap::new())
1470    }
1471}
1472
1473pub(crate) fn save_tags(
1474    shard_dir: &Path,
1475    tags: &std::collections::BTreeMap<String, String>,
1476) -> Result<()> {
1477    let data = serde_json::to_string_pretty(tags)?;
1478    fs::write(shard_dir.join("tags.json"), data)?;
1479    Ok(())
1480}
1481
1482pub fn tag_add(path: &Path, name: &str, commit_id: &str) -> Result<()> {
1483    let shard_dir = path.join(".shard");
1484    if !shard_dir.exists() {
1485        anyhow::bail!("not a shard repository (run `shard init` first)");
1486    }
1487    // Verify commit exists
1488    let store = Store::open(&shard_dir)?;
1489    load_commit(&store, commit_id)?;
1490    let mut tags = load_tags(&shard_dir)?;
1491    tags.insert(name.to_string(), commit_id.to_string());
1492    save_tags(&shard_dir, &tags)?;
1493    info!("Tagged '{}' -> {}", name, commit_id);
1494    Ok(())
1495}
1496
1497pub fn branch_create(path: &Path, name: &str, commit_id: Option<&str>) -> Result<()> {
1498    let shard_dir = path.join(".shard");
1499    if !shard_dir.exists() {
1500        anyhow::bail!("not a shard repository (run `shard init` first)");
1501    }
1502    let id = match commit_id {
1503        Some(cid) => cid.to_string(),
1504        None => {
1505            let (_, head) = branch::resolve_head(&shard_dir)?;
1506            head.ok_or_else(|| anyhow::anyhow!("No commits yet — cannot create branch"))?
1507        }
1508    };
1509    // Verify commit exists
1510    let store = Store::open(&shard_dir)?;
1511    load_commit(&store, &id)?;
1512    branch::create_branch(&shard_dir, name, &id)
1513}
1514
1515pub fn branch_delete(path: &Path, name: &str) -> Result<()> {
1516    let shard_dir = path.join(".shard");
1517    if !shard_dir.exists() {
1518        anyhow::bail!("not a shard repository (run `shard init` first)");
1519    }
1520    branch::delete_branch(&shard_dir, name)
1521}
1522
1523pub fn branch_list(path: &Path) -> Result<()> {
1524    let shard_dir = path.join(".shard");
1525    if !shard_dir.exists() {
1526        anyhow::bail!("not a shard repository (run `shard init` first)");
1527    }
1528    let (current, branches) = branch::list_branches(&shard_dir)?;
1529    if branches.is_empty() {
1530        info!("No branches.");
1531        return Ok(());
1532    }
1533    for (name, commit_id) in &branches {
1534        let prefix = if current.as_deref() == Some(name) {
1535            "* "
1536        } else {
1537            "  "
1538        };
1539        info!(
1540            "{}{} ({})",
1541            prefix,
1542            name,
1543            &commit_id[..8.min(commit_id.len())]
1544        );
1545    }
1546    Ok(())
1547}
1548
1549pub fn merge(path: &Path, branch: &str, message: &str, author: &str, json: bool) -> Result<()> {
1550    let _guard = OP_QUEUES
1551        .get_or_create(&path.to_path_buf())
1552        .acquire(ops::OpKind::Write, "merge".to_string());
1553    let shard_dir = path.join(".shard");
1554    if !shard_dir.exists() {
1555        anyhow::bail!("not a shard repository (run `shard init` first)");
1556    }
1557
1558    let store = Store::open(&shard_dir)?;
1559
1560    let config = load_config(&shard_dir)?;
1561    let fmt = MetadataFormat::from_config(&config);
1562
1563    // Resolve current HEAD
1564    let (current_branch, current_id) = branch::resolve_head(&shard_dir)?;
1565    let current_id =
1566        current_id.ok_or_else(|| anyhow::anyhow!("No commits yet — nothing to merge into"))?;
1567
1568    // Resolve source branch
1569    let source_id = branch::resolve_rev(&shard_dir, branch)?;
1570    if source_id == current_id {
1571        anyhow::bail!("Already up to date — source is the same commit as HEAD");
1572    }
1573
1574    // Load both commits
1575    let current_commit = load_commit(&store, &current_id)?;
1576    let source_commit = load_commit(&store, &source_id)?;
1577
1578    // Load manifests from both sides
1579    let mut merged_manifests: std::collections::HashMap<String, (String, Vec<String>)> =
1580        std::collections::HashMap::new();
1581
1582    for manifest_id in &current_commit.manifests {
1583        let data = store.get_chunk(manifest_id)?;
1584        let manifest: FileManifest = metadata::deserialize(&data)?;
1585        merged_manifests.insert(manifest.name.clone(), (manifest.name, manifest.chunks));
1586    }
1587
1588    for manifest_id in &source_commit.manifests {
1589        let data = store.get_chunk(manifest_id)?;
1590        let manifest: FileManifest = metadata::deserialize(&data)?;
1591        merged_manifests.insert(manifest.name.clone(), (manifest.name, manifest.chunks));
1592    }
1593
1594    // Store merged manifests (signed)
1595    let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
1596    let keys = load_keypair(&shard_dir)?;
1597    let signing_key = keys.signing_key;
1598    let mut merged_manifest_ids = Vec::new();
1599    for (name, chunks) in merged_manifests.values() {
1600        let compression = Compression::None;
1601        let mut manifest = FileManifest {
1602            name: name.clone(),
1603            size: 0,
1604            chunks: chunks.clone(),
1605            content_type: None,
1606            compression: compression.as_str().to_string(),
1607            merkle_root: Some(FileManifest::merkle_root(chunks)),
1608            created_by: Some(author.to_string()),
1609            created_at: Some(timestamp),
1610            signature: None,
1611        };
1612
1613        let mut unsigned = manifest.clone();
1614        unsigned.signature = None;
1615        let canonical = metadata::serialize_for_signing(&unsigned);
1616        let sig = signing_key.sign(&canonical);
1617        manifest.signature = Some(hex::encode(sig.to_bytes()));
1618
1619        let encoded = metadata::serialize(&manifest, &fmt);
1620        let hash = blake3::hash(&encoded);
1621        store.put_chunk(&crate::chunker::Chunk {
1622            hash,
1623            data: encoded,
1624            offset: 0,
1625        })?;
1626        merged_manifest_ids.push(hash.to_hex().to_string());
1627    }
1628    merged_manifest_ids.sort();
1629
1630    // Create merge commit
1631    let public_key_hex = hex::encode(keys.verifying_key.to_bytes());
1632    let key_id = keychain::get_current_key_id(&shard_dir.join("keys")).ok();
1633    let parents = vec![current_id.clone(), source_id.clone()];
1634    let mut commit = Commit {
1635        commit_id: String::new(),
1636        parents,
1637        manifests: merged_manifest_ids,
1638        author: author.to_string(),
1639        message: message.to_string(),
1640        timestamp,
1641        public_key: Some(public_key_hex),
1642        signature: None,
1643        key_id,
1644    };
1645
1646    let json_unsigned = metadata::serialize_for_signing(&commit);
1647    let signature = signing_key.sign(&json_unsigned);
1648    commit.signature = Some(hex::encode(signature.to_bytes()));
1649
1650    let encoded = metadata::serialize(&commit, &fmt);
1651    let hash = blake3::hash(&encoded);
1652    store.put_chunk(&crate::chunker::Chunk {
1653        hash,
1654        data: encoded,
1655        offset: 0,
1656    })?;
1657
1658    let merge_commit_id = hash.to_hex().to_string();
1659
1660    // Cycle detection on merge
1661    if has_dag_cycle(&store, &commit.parents, &merge_commit_id)? {
1662        anyhow::bail!(
1663            "Cycle detected in merge: commit {} is already an ancestor of one or more parents",
1664            merge_commit_id
1665        );
1666    }
1667
1668    // Update HEAD and branch ref
1669    if let Some(ref branch_name) = current_branch {
1670        branch::update_branch_ref(&shard_dir, branch_name, &merge_commit_id)?;
1671        branch::set_head_branch(&shard_dir, branch_name)?;
1672    } else {
1673        branch::set_head_commit(&shard_dir, &merge_commit_id)?;
1674    }
1675
1676    if json {
1677        info!(
1678            "{}",
1679            serde_json::to_string(&serde_json::json!({
1680                "commit_id": merge_commit_id,
1681                "message": message,
1682            }))?
1683        );
1684    } else {
1685        info!("Merge commit {} ({})", merge_commit_id, message);
1686    }
1687    Ok(())
1688}
1689
1690pub fn tag_list(path: &Path) -> Result<()> {
1691    let shard_dir = path.join(".shard");
1692    if !shard_dir.exists() {
1693        anyhow::bail!("not a shard repository (run `shard init` first)");
1694    }
1695    let tags = load_tags(&shard_dir)?;
1696    if tags.is_empty() {
1697        info!("No tags.");
1698    } else {
1699        for (name, commit_id) in &tags {
1700            info!("{} -> {}", name, commit_id);
1701        }
1702    }
1703    Ok(())
1704}
1705
1706fn collect_reachable(
1707    store: &Store,
1708    commit_id: &str,
1709    seen_commits: &mut std::collections::HashSet<String>,
1710    reachable: &mut std::collections::HashSet<String>,
1711) -> Result<()> {
1712    if !seen_commits.insert(commit_id.to_string()) {
1713        return Ok(());
1714    }
1715
1716    reachable.insert(commit_id.to_string());
1717
1718    let commit = match load_commit(store, commit_id) {
1719        Ok(c) => c,
1720        Err(_) => return Ok(()),
1721    };
1722
1723    for manifest_id in &commit.manifests {
1724        reachable.insert(manifest_id.clone());
1725
1726        if let Ok(data) = store.get_chunk(manifest_id) {
1727            if let Ok(manifest) = metadata::deserialize::<FileManifest>(&data) {
1728                for chunk_id in &manifest.chunks {
1729                    reachable.insert(chunk_id.clone());
1730                }
1731            }
1732        }
1733    }
1734
1735    for parent_id in &commit.parents {
1736        collect_reachable(store, parent_id, seen_commits, reachable)?;
1737    }
1738
1739    Ok(())
1740}
1741
1742pub fn prune(path: &Path, json: bool) -> Result<()> {
1743    let _guard = OP_QUEUES
1744        .get_or_create(&path.to_path_buf())
1745        .acquire(ops::OpKind::Write, "prune".to_string());
1746    let shard_dir = path.join(".shard");
1747    if !shard_dir.exists() {
1748        anyhow::bail!("not a shard repository (run `shard init` first)");
1749    }
1750
1751    let config = load_config(&shard_dir)?;
1752    let fmt = MetadataFormat::from_config(&config);
1753
1754    let store = Store::open(&shard_dir)?;
1755    let mut reachable: std::collections::HashSet<String> = std::collections::HashSet::new();
1756
1757    // 1. Walk from HEAD commit (and all branch tips)
1758    let (_, head_commit) = branch::resolve_head(&shard_dir)?;
1759    if let Some(ref head) = head_commit {
1760        collect_reachable(
1761            &store,
1762            head,
1763            &mut std::collections::HashSet::new(),
1764            &mut reachable,
1765        )?;
1766    }
1767
1768    // Also walk from all branch refs (in case HEAD is detached from any branch)
1769    if let Ok(branches) = branch::list_branches(&shard_dir) {
1770        for (_, commit_id) in branches.1 {
1771            collect_reachable(
1772                &store,
1773                &commit_id,
1774                &mut std::collections::HashSet::new(),
1775                &mut reachable,
1776            )?;
1777        }
1778    }
1779
1780    // 2. Walk from tags
1781    let tags = load_tags(&shard_dir)?;
1782    for commit_id in tags.values() {
1783        collect_reachable(
1784            &store,
1785            commit_id,
1786            &mut std::collections::HashSet::new(),
1787            &mut reachable,
1788        )?;
1789    }
1790
1791    // 3. Walk from index (staged files)
1792    let index = Index::load(&shard_dir.join("index"), &fmt)?;
1793    for manifest in index.files.values() {
1794        let json = metadata::serialize(manifest, &fmt);
1795        let hash = blake3::hash(&json);
1796        let hash_hex = hash.to_hex().to_string();
1797        reachable.insert(hash_hex);
1798        for chunk_hash in &manifest.chunks {
1799            reachable.insert(chunk_hash.clone());
1800        }
1801    }
1802
1803    // 4. Scan objects and remove unreachable
1804    let all_chunks = store.iter_chunks()?;
1805    let mut pruned = 0u64;
1806    let mut kept = 0u64;
1807    for (hash_hex, full_path) in &all_chunks {
1808        if !reachable.contains(hash_hex) {
1809            store.delete_chunk(hash_hex, Some(full_path))?;
1810            pruned += 1;
1811        } else {
1812            kept += 1;
1813        }
1814    }
1815
1816    if json {
1817        info!(
1818            "{}",
1819            serde_json::to_string(&serde_json::json!({
1820                "pruned": pruned,
1821                "remaining": kept,
1822            }))?
1823        );
1824    } else {
1825        info!("Pruned {} objects. {} objects remain.", pruned, kept);
1826    }
1827    Ok(())
1828}
1829
1830pub fn peer_add(path: &Path, multiaddr: &str, json: bool) -> Result<()> {
1831    let shard_dir = path.join(".shard");
1832    if !shard_dir.exists() {
1833        anyhow::bail!("not a shard repository (run `shard init` first)");
1834    }
1835
1836    // Validate multiaddr format
1837    if multiaddr.is_empty() || multiaddr.parse::<shard_net::libp2p::Multiaddr>().is_err() {
1838        anyhow::bail!("invalid multiaddr '{}' (must be a valid libp2p multiaddr, e.g. /ip4/1.2.3.4/tcp/5678/p2p/...)", multiaddr);
1839    }
1840
1841    let peers_path = shard_dir.join("peers.json");
1842    let mut peers: Vec<String> = if peers_path.exists() {
1843        let data = fs::read(&peers_path)?;
1844        serde_json::from_slice(&data)?
1845    } else {
1846        Vec::new()
1847    };
1848
1849    let added = if !peers.contains(&multiaddr.to_string()) {
1850        peers.push(multiaddr.to_string());
1851        let data = serde_json::to_vec(&peers)?;
1852        fs::write(peers_path, data)?;
1853        true
1854    } else {
1855        false
1856    };
1857
1858    if json {
1859        info!(
1860            "{}",
1861            serde_json::to_string(&serde_json::json!({
1862                "multiaddr": multiaddr,
1863                "added": added,
1864            }))?
1865        );
1866    } else if added {
1867        info!("Added peer: {}", multiaddr);
1868    } else {
1869        info!("Peer already exists: {}", multiaddr);
1870    }
1871
1872    Ok(())
1873}
1874
1875fn load_peers(shard_dir: &Path) -> Result<Vec<String>> {
1876    let peers_path = shard_dir.join("peers.json");
1877    if peers_path.exists() {
1878        let data = fs::read(peers_path)?;
1879        Ok(serde_json::from_slice(&data)?)
1880    } else {
1881        Ok(Vec::new())
1882    }
1883}
1884
1885fn authorized_keys_path(shard_dir: &Path) -> std::path::PathBuf {
1886    shard_dir.join("authorized_keys")
1887}
1888
1889/// An authorized key entry with optional comment.
1890struct AuthorizedKeyEntry {
1891    key: ed25519_dalek::VerifyingKey,
1892    comment: String,
1893}
1894
1895fn parse_authorized_key_line(line: &str) -> Option<AuthorizedKeyEntry> {
1896    let line = line.trim();
1897    if line.is_empty() || line.starts_with('#') {
1898        return None;
1899    }
1900    // Format: <hex_key> [comment]
1901    let parts: Vec<&str> = line.splitn(2, char::is_whitespace).collect();
1902    let hex_key = parts[0].trim();
1903    let comment = parts.get(1).unwrap_or(&"").trim().to_string();
1904    let bytes = hex::decode(hex_key).ok()?;
1905    let arr: [u8; 32] = bytes.try_into().ok()?;
1906    let key = ed25519_dalek::VerifyingKey::from_bytes(&arr).ok()?;
1907    Some(AuthorizedKeyEntry { key, comment })
1908}
1909
1910fn load_authorized_keys(shard_dir: &Path) -> Result<Vec<ed25519_dalek::VerifyingKey>> {
1911    let path = authorized_keys_path(shard_dir);
1912    if !path.exists() {
1913        return Ok(Vec::new());
1914    }
1915    let content = fs::read_to_string(&path)?;
1916    let mut keys = Vec::new();
1917    for line in content.lines() {
1918        if let Some(entry) = parse_authorized_key_line(line) {
1919            keys.push(entry.key);
1920        }
1921    }
1922    Ok(keys)
1923}
1924
1925pub fn add_authorized_key(shard_dir: &Path, public_key_hex: &str) -> Result<()> {
1926    // Validate the key
1927    let bytes = hex::decode(public_key_hex)?;
1928    let arr: [u8; 32] = bytes
1929        .as_slice()
1930        .try_into()
1931        .map_err(|_| anyhow::anyhow!("Public key must be 32 bytes (64 hex chars)"))?;
1932    let _pk = ed25519_dalek::VerifyingKey::from_bytes(&arr)?;
1933
1934    let path = authorized_keys_path(shard_dir);
1935    let mut content = if path.exists() {
1936        fs::read_to_string(&path)?
1937    } else {
1938        String::new()
1939    };
1940    // Check if key already exists
1941    if content.lines().any(|l| {
1942        if let Some(entry) = parse_authorized_key_line(l) {
1943            entry.key.to_bytes().to_vec() == arr.to_vec()
1944        } else {
1945            false
1946        }
1947    }) {
1948        info!("Key already authorized");
1949        return Ok(());
1950    }
1951    content.push_str(public_key_hex);
1952    content.push('\n');
1953    fs::write(&path, content)?;
1954    info!("Authorized key added");
1955    Ok(())
1956}
1957
1958pub fn remove_authorized_key(shard_dir: &Path, public_key_hex: &str) -> Result<()> {
1959    let path = authorized_keys_path(shard_dir);
1960    if !path.exists() {
1961        anyhow::bail!("authorized_keys file does not exist");
1962    }
1963    let content = fs::read_to_string(&path)?;
1964    let new_content: Vec<&str> = content
1965        .lines()
1966        .filter(|l| {
1967            if let Some(entry) = parse_authorized_key_line(l) {
1968                hex::encode(entry.key.to_bytes()) != public_key_hex
1969            } else {
1970                true
1971            }
1972        })
1973        .collect();
1974    if new_content.len() == content.lines().count() {
1975        anyhow::bail!("Key not found in authorized_keys");
1976    }
1977    let has_keys = new_content
1978        .iter()
1979        .any(|l| parse_authorized_key_line(l).is_some());
1980    if has_keys {
1981        fs::write(&path, new_content.join("\n") + "\n")?;
1982    } else {
1983        fs::remove_file(&path)?;
1984    }
1985    info!("Authorized key removed");
1986    Ok(())
1987}
1988
1989pub fn list_authorized_keys(shard_dir: &Path, json: bool) -> Result<()> {
1990    let path = authorized_keys_path(shard_dir);
1991    if !path.exists() {
1992        if json {
1993            info!(
1994                "{}",
1995                serde_json::to_string(&serde_json::json!({"keys": []}))?
1996            );
1997        } else {
1998            println!("No authorized keys.");
1999        }
2000        return Ok(());
2001    }
2002    let content = fs::read_to_string(&path)?;
2003    let mut entries = Vec::new();
2004    for line in content.lines() {
2005        if let Some(entry) = parse_authorized_key_line(line) {
2006            entries.push(serde_json::json!({
2007                "key": hex::encode(entry.key.to_bytes()),
2008                "comment": entry.comment,
2009            }));
2010        }
2011    }
2012    if entries.is_empty() {
2013        if json {
2014            info!(
2015                "{}",
2016                serde_json::to_string(&serde_json::json!({"keys": []}))?
2017            );
2018        } else {
2019            println!("No authorized keys.");
2020        }
2021        return Ok(());
2022    }
2023    if json {
2024        info!(
2025            "{}",
2026            serde_json::to_string(&serde_json::json!({"keys": entries}))?
2027        );
2028    } else {
2029        for e in &entries {
2030            let comment = e["comment"].as_str().unwrap_or("");
2031            if comment.is_empty() {
2032                println!("  {}", e["key"].as_str().unwrap_or(""));
2033            } else {
2034                println!("  {}  {}", e["key"].as_str().unwrap_or(""), comment);
2035            }
2036        }
2037    }
2038    Ok(())
2039}
2040
2041pub fn backup(path: &Path, output: &Path, json: bool) -> Result<()> {
2042    let shard_dir = path.join(".shard");
2043    if !shard_dir.exists() {
2044        anyhow::bail!("not a shard repository (run `shard init` first)");
2045    }
2046    let file = fs::File::create(output)?;
2047    let encoder = flate2::write::GzEncoder::new(file, flate2::Compression::default());
2048    let mut archive = tar::Builder::new(encoder);
2049    archive.append_dir_all(".shard", &shard_dir)?;
2050    archive.finish()?;
2051    if json {
2052        info!(
2053            "{}",
2054            serde_json::to_string(&serde_json::json!({
2055                "path": output.to_string_lossy(),
2056            }))?
2057        );
2058    } else {
2059        info!("Backup created: {}", output.display());
2060    }
2061    Ok(())
2062}
2063
2064pub fn export(path: &Path, commit_id: &str, output_dir: &Path, json: bool) -> Result<()> {
2065    let shard_dir = path.join(".shard");
2066    if !shard_dir.exists() {
2067        anyhow::bail!("not a shard repository (run `shard init` first)");
2068    }
2069    let store = Store::open(&shard_dir)?;
2070    let cipher = maybe_load_cipher(&shard_dir)?;
2071    let commit = load_commit(&store, commit_id)?;
2072    let mut files = Vec::new();
2073    for manifest_id in &commit.manifests {
2074        let data = store.get_chunk(manifest_id)?;
2075        let manifest: FileManifest = metadata::deserialize(&data)?;
2076        let compression = manifest.compression.parse::<Compression>()?;
2077        if !json {
2078            info!("Exporting file: {}", manifest.name);
2079        }
2080        let mut file_data = Vec::new();
2081        for chunk_id in &manifest.chunks {
2082            let chunk_data = store.get_chunk(chunk_id)?;
2083            let decrypted = match &cipher {
2084                Some(c) => c.decrypt(&chunk_data)?,
2085                None => chunk_data,
2086            };
2087            let decompressed = compression.decompress(&decrypted)?;
2088            file_data.extend_from_slice(&decompressed);
2089        }
2090        let out_path = output_dir.join(&manifest.name);
2091        if let Some(parent) = out_path.parent() {
2092            fs::create_dir_all(parent)?;
2093        }
2094        fs::write(&out_path, file_data)?;
2095        if !json {
2096            info!("  -> {}", out_path.display());
2097        }
2098        files.push(manifest.name);
2099    }
2100    if json {
2101        info!(
2102            "{}",
2103            serde_json::to_string(&serde_json::json!({
2104                "commit_id": commit_id,
2105                "files": files,
2106                "output_dir": output_dir.to_string_lossy(),
2107            }))?
2108        );
2109    } else {
2110        info!("Export complete.");
2111    }
2112    Ok(())
2113}
2114
2115pub fn import(
2116    path: &Path,
2117    source_dir: &Path,
2118    message: &str,
2119    author: &str,
2120    json: bool,
2121) -> Result<()> {
2122    let shard_dir = path.join(".shard");
2123    if !shard_dir.exists() {
2124        anyhow::bail!("not a shard repository (run `shard init` first)");
2125    }
2126    // Walk files in source_dir
2127    let config = load_config(&shard_dir)?;
2128    let compression: Compression = config
2129        .get("compression")
2130        .map(|s| s.as_str())
2131        .unwrap_or("zstd")
2132        .parse()?;
2133    let chunker_mode = chunker::ChunkerMode::from_config(&config);
2134    let fmt = MetadataFormat::from_config(&config);
2135    let store = Store::open(&shard_dir)?;
2136    let cipher = maybe_load_cipher(&shard_dir)?;
2137    let mut index = Index::load(&shard_dir.join("index"), &fmt)?;
2138    if !source_dir.is_dir() {
2139        anyhow::bail!("Source must be a directory");
2140    }
2141    for entry in walkdir::WalkDir::new(source_dir)
2142        .into_iter()
2143        .filter_entry(|e| {
2144            e.file_name()
2145                .to_str()
2146                .map(|s| !s.starts_with('.'))
2147                .unwrap_or(false)
2148        })
2149    {
2150        let entry = entry?;
2151        if entry.file_type().is_file() {
2152            add_file(
2153                path,
2154                entry.path(),
2155                &store,
2156                &mut index,
2157                &compression,
2158                &chunker_mode,
2159                cipher.as_ref(),
2160                json,
2161            )?;
2162        }
2163    }
2164    index.save(&shard_dir.join("index"), &fmt)?;
2165    // Auto-commit
2166    if !index.files.is_empty() {
2167        commit(path, message, author, json)?;
2168    } else if json {
2169        info!(
2170            "{}",
2171            serde_json::to_string(&serde_json::json!({"status": "no files found"}))?
2172        );
2173    } else {
2174        info!("No files found to import.");
2175    }
2176    Ok(())
2177}
2178
2179pub fn restore(path: &Path, backup_file: &Path, json: bool) -> Result<()> {
2180    let shard_dir = path.join(".shard");
2181    if shard_dir.exists() {
2182        anyhow::bail!(
2183            "Repository already exists — remove .shard first or use a different directory"
2184        );
2185    }
2186    let file = fs::File::open(backup_file)?;
2187    let decoder = flate2::read::GzDecoder::new(file);
2188    let mut archive = tar::Archive::new(decoder);
2189    archive.unpack(path)?;
2190    // Verify the result
2191    if !path.join(".shard").exists() {
2192        anyhow::bail!("Backup does not contain a valid .shard directory");
2193    }
2194    if json {
2195        info!(
2196            "{}",
2197            serde_json::to_string(&serde_json::json!({
2198                "backup": backup_file.to_string_lossy(),
2199            }))?
2200        );
2201    } else {
2202        info!("Restored from {}", backup_file.display());
2203    }
2204    Ok(())
2205}
2206
2207struct RepoProvider {
2208    store: Store,
2209    shard_dir: std::path::PathBuf,
2210}
2211
2212impl shard_net::p2p::ShardContentProvider for RepoProvider {
2213    fn get_manifest(&self, id: &str) -> Option<Vec<u8>> {
2214        self.store.get_chunk(id).ok()
2215    }
2216    fn get_chunk(&self, id: &str) -> Option<Vec<u8>> {
2217        self.store.get_chunk(id).ok()
2218    }
2219    fn put_chunk(&mut self, id: &str, data: &[u8]) -> bool {
2220        let hash = blake3::hash(data);
2221        let hex = hash.to_hex().to_string();
2222        if hex != id {
2223            return false;
2224        }
2225        self.store
2226            .put_chunk(&crate::chunker::Chunk {
2227                hash,
2228                data: data.to_vec(),
2229                offset: 0,
2230            })
2231            .is_ok()
2232    }
2233    fn verify_auth(&self, public_key: &[u8], nonce: &[u8], signature: &[u8]) -> bool {
2234        use ed25519_dalek::Verifier;
2235        let pk_bytes: [u8; 32] = match public_key.try_into() {
2236            Ok(b) => b,
2237            Err(_) => return false,
2238        };
2239        let pk = match ed25519_dalek::VerifyingKey::from_bytes(&pk_bytes) {
2240            Ok(k) => k,
2241            Err(_) => return false,
2242        };
2243        let sig_bytes: [u8; 64] = match signature.try_into() {
2244            Ok(b) => b,
2245            Err(_) => return false,
2246        };
2247        let sig = ed25519_dalek::Signature::from_bytes(&sig_bytes);
2248        if pk.verify(nonce, &sig).is_err() {
2249            return false;
2250        }
2251        // Check authorized_keys if the file exists.
2252        // If authorized_keys does not exist, repo is open (backward compat).
2253        // If it exists, the key MUST be in the list.
2254        let auth_path = self.shard_dir.join("authorized_keys");
2255        if auth_path.exists() {
2256            if let Ok(keys) = load_authorized_keys(&self.shard_dir) {
2257                return keys.contains(&pk);
2258            }
2259            return false;
2260        }
2261        true
2262    }
2263    fn repo_public_key(&self) -> Option<Vec<u8>> {
2264        let keys = load_keypair(&self.shard_dir).ok()?;
2265        Some(keys.verifying_key.to_bytes().to_vec())
2266    }
2267}
2268
2269/// Compute file count and total size for a commit's manifests.
2270fn commit_stats(shard_dir: &Path, commit_id: &str) -> Result<(u64, u64)> {
2271    let store = Store::open(shard_dir)?;
2272    let commit = load_commit(&store, commit_id)?;
2273    let mut file_count = 0u64;
2274    let mut total_size = 0u64;
2275    for mid in &commit.manifests {
2276        if let Ok(data) = store.get_chunk(mid) {
2277            if let Ok(m) = metadata::deserialize::<FileManifest>(&data) {
2278                file_count += 1;
2279                total_size += m.size;
2280            }
2281        }
2282    }
2283    Ok((file_count, total_size))
2284}
2285
2286pub async fn share(path: &Path, json: bool) -> Result<()> {
2287    let shard_dir = path.join(".shard");
2288    if !shard_dir.exists() {
2289        anyhow::bail!("not a shard repository (run `shard init` first)");
2290    }
2291
2292    let mut node = shard_net::p2p::Node::new().await?;
2293
2294    // Subscribe to global announcement topic
2295    let ann_topic = shard_net::libp2p::gossipsub::IdentTopic::new("shard:ann");
2296    node.subscribe(&ann_topic)?;
2297
2298    // Bootstrap from peers
2299    let peers = load_peers(&shard_dir)?;
2300    for peer in peers {
2301        if let Ok(addr) = peer.parse::<shard_net::libp2p::Multiaddr>() {
2302            let _ = node.swarm.dial(addr);
2303        }
2304    }
2305
2306    // Trigger Kademlia DHT bootstrap
2307    node.swarm.behaviour_mut().kademlia.bootstrap().ok();
2308
2309    node.listen("/ip4/0.0.0.0/tcp/0").await?;
2310    let listen_addrs: Vec<String> = node.swarm.listeners().map(|a| a.to_string()).collect();
2311    let peer_multiaddr = listen_addrs.first().cloned().unwrap_or_default();
2312
2313    let config = load_config(&shard_dir)?;
2314    let repo_name = config.get("repo_name").cloned().unwrap_or_default();
2315
2316    let store = Store::open(&shard_dir)?;
2317    let provider = RepoProvider {
2318        store,
2319        shard_dir: shard_dir.clone(),
2320    };
2321
2322    // Publish announcement for HEAD commit
2323    if let Some(head) = branch::resolve_head(&shard_dir)?.1 {
2324        let (file_count, total_size) = commit_stats(&shard_dir, &head)?;
2325        let ann = shard_net::protocol::Announcement {
2326            commit_id: head,
2327            file_count,
2328            total_size,
2329            repo_name: repo_name.clone(),
2330            peer_multiaddr: format!("{}/p2p/{}", peer_multiaddr, node.local_peer_id()),
2331        };
2332        let payload = serde_json::to_vec(&ann)?;
2333        let _ = node.publish(&ann_topic, payload);
2334    }
2335
2336    if json {
2337        info!(
2338            "{}",
2339            serde_json::to_string(&serde_json::json!({
2340                "status": "sharing",
2341                "peer_id": node.local_peer_id().to_string(),
2342            }))?
2343        );
2344    } else {
2345        info!("Sharing repository...");
2346    }
2347    node.run(provider).await;
2348
2349    Ok(())
2350}
2351
2352/// Start a circuit relay v2 server for NAT traversal.
2353/// Listens on the given address and forwards traffic between peers.
2354pub async fn relay(listen_addr: &str, json: bool) -> Result<()> {
2355    let mut node = shard_net::p2p::Node::new().await?;
2356    node.listen(listen_addr).await?;
2357    if json {
2358        info!(
2359            "{}",
2360            serde_json::to_string(&serde_json::json!({
2361                "status": "relay active",
2362                "listen": listen_addr,
2363                "peer_id": node.local_peer_id().to_string(),
2364            }))?
2365        );
2366    } else {
2367        info!("Relay server active on {}", listen_addr);
2368        info!("Peer ID: {}", node.local_peer_id());
2369        info!("Ready to accept circuit relay v2 reservations");
2370    }
2371    node.run(EmptyProvider).await;
2372    Ok(())
2373}
2374
2375/// Minimal provider for relay-only mode (no repo content needed).
2376struct EmptyProvider;
2377impl shard_net::p2p::ShardContentProvider for EmptyProvider {
2378    fn get_manifest(&self, _id: &str) -> Option<Vec<u8>> {
2379        None
2380    }
2381    fn get_chunk(&self, _id: &str) -> Option<Vec<u8>> {
2382        None
2383    }
2384    fn put_chunk(&mut self, _id: &str, _data: &[u8]) -> bool {
2385        false
2386    }
2387    fn verify_auth(&self, _public_key: &[u8], _nonce: &[u8], _signature: &[u8]) -> bool {
2388        false
2389    }
2390    fn repo_public_key(&self) -> Option<Vec<u8>> {
2391        None
2392    }
2393}
2394
2395pub async fn sync(path: &Path, _json: bool) -> Result<()> {
2396    let shard_dir = path.join(".shard");
2397    if !shard_dir.exists() {
2398        anyhow::bail!("not a shard repository (run `shard init` first)");
2399    }
2400
2401    let config = load_config(&shard_dir)?;
2402    let repo_id = config.get("repo_id").cloned().unwrap_or_default();
2403    let repo_name = config.get("repo_name").cloned().unwrap_or_default();
2404    let rate_limit_max = config::config_get_rate_limit_max(&config);
2405    let _rate_limit_window = config::config_get_rate_limit_window(&config);
2406    let ann_topic = shard_net::libp2p::gossipsub::IdentTopic::new("shard:ann");
2407    let repo_topic =
2408        shard_net::libp2p::gossipsub::IdentTopic::new(format!("/shard/repo/{}", repo_id));
2409
2410    let mut node = shard_net::p2p::Node::new().await?;
2411    node.subscribe(&ann_topic)?;
2412    node.subscribe(&repo_topic)?;
2413    node.listen("/ip4/0.0.0.0/tcp/0").await?;
2414    let listen_addrs: Vec<String> = node.swarm.listeners().map(|a| a.to_string()).collect();
2415    let peer_multiaddr = listen_addrs.first().cloned().unwrap_or_default();
2416
2417    // Bootstrap from configured peers
2418    let peers = load_peers(&shard_dir)?;
2419    for peer in peers {
2420        if let Ok(addr) = peer.parse::<shard_net::libp2p::Multiaddr>() {
2421            let _ = node.swarm.dial(addr);
2422        }
2423    }
2424
2425    // Trigger Kademlia DHT bootstrap
2426    node.swarm.behaviour_mut().kademlia.bootstrap().ok();
2427
2428    // Helper to publish announcement to both global and repo-specific topics
2429    let publish_ann = |node: &mut shard_net::p2p::Node, ann: &shard_net::protocol::Announcement| {
2430        if let Ok(payload) = serde_json::to_vec(ann) {
2431            let _ = node.publish(&ann_topic, payload.clone());
2432            let _ = node.publish(&repo_topic, payload);
2433        }
2434    };
2435
2436    let head_commit = branch::resolve_head(&shard_dir)?.1;
2437
2438    // Initial announce (may fail with InsufficientPeers if no peers yet)
2439    if let Some(ref head) = head_commit {
2440        let (file_count, total_size) = commit_stats(&shard_dir, head)?;
2441        let ann = shard_net::protocol::Announcement {
2442            commit_id: head.clone(),
2443            file_count,
2444            total_size,
2445            repo_name: repo_name.clone(),
2446            peer_multiaddr: format!("{}/p2p/{}", peer_multiaddr, node.local_peer_id()),
2447        };
2448        publish_ann(&mut node, &ann);
2449        if !_json {
2450            info!("Announced commit {} on sync topic", head)
2451        }
2452    } else if !_json {
2453        info!("No commits to announce");
2454    }
2455
2456    if !_json {
2457        info!("Syncing on topic with peer id: {}", node.local_peer_id());
2458    }
2459    let _ = std::io::stdout().flush();
2460
2461    let store = Store::open(&shard_dir)?;
2462    let mut provider = RepoProvider {
2463        store,
2464        shard_dir: shard_dir.clone(),
2465    };
2466
2467    let mut interval = tokio::time::interval(Duration::from_secs(5));
2468    let mut address_book: HashMap<shard_net::libp2p::PeerId, Vec<shard_net::libp2p::Multiaddr>> =
2469        HashMap::new();
2470    let mut announce_counts: HashMap<(shard_net::libp2p::PeerId, String), u32> = HashMap::new();
2471    let mut request_counts: HashMap<shard_net::libp2p::PeerId, u32> = HashMap::new();
2472    let path_buf = path.to_path_buf();
2473
2474    loop {
2475        tokio::select! {
2476            _ = tokio::signal::ctrl_c() => {
2477                info!("\nSync shutting down...");
2478                break Ok(());
2479            }
2480            _ = interval.tick() => {
2481                // Periodic rate-limit counter reset (rolling 60s window)
2482                request_counts.clear();
2483                announce_counts.clear();
2484                if let Some(ref head) = branch::resolve_head(&shard_dir)?.1 {
2485                    let (fc, ts) = commit_stats(&shard_dir, head).unwrap_or_default();
2486                    let ann = shard_net::protocol::Announcement {
2487                        commit_id: head.clone(),
2488                        file_count: fc,
2489                        total_size: ts,
2490                        repo_name: repo_name.clone(),
2491                        peer_multiaddr: format!("{}/p2p/{}", peer_multiaddr, node.local_peer_id()),
2492                    };
2493                    publish_ann(&mut node, &ann);
2494                    info!("Re-announced commit {} on sync topic", head);
2495                }
2496            }
2497            event = node.swarm.select_next_some() => {
2498                match event {
2499                    shard_net::libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
2500                        info!("Listening on {address:?}");
2501                        let _ = std::io::stdout().flush();
2502                    }
2503                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2504                        shard_net::p2p::ShardBehaviourEvent::Mdns(
2505                            shard_net::libp2p::mdns::Event::Discovered(list),
2506                        ),
2507                    ) => {
2508                        for (peer_id, multiaddr) in list {
2509                            info!("mDNS discovered: {peer_id} {multiaddr}");
2510                            address_book.entry(peer_id).or_default().push(multiaddr.clone());
2511                            node.swarm
2512                                .behaviour_mut()
2513                                .gossipsub
2514                                .add_explicit_peer(&peer_id);
2515                            node.swarm
2516                                .behaviour_mut()
2517                                .kademlia
2518                                .add_address(&peer_id, multiaddr);
2519                        }
2520                    }
2521                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2522                        shard_net::p2p::ShardBehaviourEvent::Mdns(shard_net::libp2p::mdns::Event::Expired(
2523                            list,
2524                        )),
2525                    ) => {
2526                        for (peer_id, _multiaddr) in list {
2527                            info!("mDNS expired: {peer_id}");
2528                            node.swarm
2529                                .behaviour_mut()
2530                                .gossipsub
2531                                .remove_explicit_peer(&peer_id);
2532                        }
2533                    }
2534                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2535                        shard_net::p2p::ShardBehaviourEvent::Gossipsub(
2536                            shard_net::libp2p::gossipsub::Event::Message {
2537                                propagation_source,
2538                                message,
2539                                ..
2540                            },
2541                        ),
2542                    ) => {
2543                        // Parse structured Announcement JSON
2544                        if let Ok(ann) = serde_json::from_slice::<shard_net::protocol::Announcement>(&message.data) {
2545                            let peer = propagation_source;
2546                            let commit_id_owned = ann.commit_id;
2547                            info!(
2548                                "Peer {} announced commit: {} (repo: {}, {} files, {} bytes)",
2549                                peer, commit_id_owned, ann.repo_name, ann.file_count, ann.total_size,
2550                            );
2551                            // Store announcer's multiaddr if we don't have it yet
2552                            if !ann.peer_multiaddr.is_empty() && !address_book.contains_key(&peer) {
2553                                if let Ok(addr) = ann.peer_multiaddr.parse::<shard_net::libp2p::Multiaddr>() {
2554                                    address_book.entry(peer).or_default().push(addr);
2555                                }
2556                            }
2557                            // Rate limiting: track announcement count per peer
2558                            let rate_key = (peer, commit_id_owned.clone());
2559                            if announce_counts.get(&rate_key).copied().unwrap_or(0) > 5 {
2560                                warn!("Rate limit exceeded for peer {} commit {}", peer, commit_id_owned);
2561                            } else {
2562                                *announce_counts.entry(rate_key).or_insert(0) += 1;
2563                                // Reply with our HEAD if different
2564                                let our_head = branch::resolve_head(&shard_dir)?.1.unwrap_or_default();
2565                                if our_head != commit_id_owned {
2566                                    let (fc, ts) = commit_stats(&shard_dir, &our_head).unwrap_or_default();
2567                                    let reply = shard_net::protocol::Announcement {
2568                                        commit_id: our_head,
2569                                        file_count: fc,
2570                                        total_size: ts,
2571                                        repo_name: repo_name.clone(),
2572                                        peer_multiaddr: format!("{}/p2p/{}", peer_multiaddr, node.local_peer_id()),
2573                                    };
2574                                    publish_ann(&mut node, &reply);
2575                                }
2576                                if let Some(addrs) = address_book.get(&peer) {
2577                                    if let Some(addr) = addrs.first() {
2578                                        let multiaddr_str = format!("{}/p2p/{}", addr, peer);
2579                                        let path_clone = path_buf.clone();
2580                                        tokio::spawn(async move {
2581                                            match pull(&path_clone, &multiaddr_str, &commit_id_owned, false).await {
2582                                                Ok(_) => info!("Auto-pulled commit {} from {}", commit_id_owned, peer),
2583                                                Err(e) => error!("Auto-pull failed for commit {} from {}: {}", commit_id_owned, peer, e),
2584                                            }
2585                                        });
2586                                    }
2587                                }
2588                            }
2589                        }
2590                    }
2591                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2592                        shard_net::p2p::ShardBehaviourEvent::RequestResponse(
2593                            shard_net::libp2p::request_response::Event::Message { peer, message },
2594                        ),
2595                    ) => {
2596                        if let shard_net::libp2p::request_response::Message::Request {
2597                            request, channel, ..
2598                        } = message
2599                        {
2600                            // Per-peer request rate limiting: max rate_limit_max in rate_limit_window window
2601                            let req_count = request_counts.entry(peer).or_insert(0u32);
2602                            *req_count += 1;
2603                            if *req_count > rate_limit_max {
2604                                warn!("Dropping request from {}: rate limit exceeded", peer);
2605                                // Reset counter periodically (cooldown via interval tick)
2606                            } else {
2607                                info!("Received request from {}", peer);
2608                                node.serve_request(&peer, &mut provider, request, channel);
2609                            }
2610                        } else {
2611                            info!("Received Response from {}", peer);
2612                        }
2613                    }
2614                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2615                        shard_net::p2p::ShardBehaviourEvent::RequestResponse(
2616                            shard_net::libp2p::request_response::Event::OutboundFailure {
2617                                peer, error, ..
2618                            },
2619                        ),
2620                    ) => {
2621                        error!("Outbound failure to {}: {:?}", peer, error);
2622                    }
2623                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2624                        shard_net::p2p::ShardBehaviourEvent::RequestResponse(
2625                            shard_net::libp2p::request_response::Event::InboundFailure {
2626                                peer, error, ..
2627                            },
2628                        ),
2629                    ) => {
2630                        error!("Inbound failure from {}: {:?}", peer, error);
2631                    }
2632                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2633                        shard_net::p2p::ShardBehaviourEvent::Identify(
2634                            shard_net::libp2p::identify::Event::Received { peer_id, info },
2635                        ),
2636                    ) => {
2637                        info!("Identify received from {}: {:?}", peer_id, info.listen_addrs);
2638                        for addr in info.listen_addrs {
2639                            address_book.entry(peer_id).or_default().push(addr);
2640                        }
2641                        let _ = std::io::stdout().flush();
2642                    }
2643                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2644                        shard_net::p2p::ShardBehaviourEvent::Identify(event),
2645                    ) => {
2646                        info!("Identify event: {:?}", event);
2647                    }
2648                    shard_net::libp2p::swarm::SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } => {
2649                        info!("Connection established with {}", peer_id);
2650                        // Only store the address when we dialed (it's the peer's listen addr).
2651                        // For listener connections, send_back_addr is the ephemeral port — useless for dialing back.
2652                        if let shard_net::libp2p::core::ConnectedPoint::Dialer { address, .. } = &endpoint {
2653                            address_book.entry(peer_id).or_default().push(address.clone());
2654                        }
2655                        node.swarm
2656                            .behaviour_mut()
2657                            .gossipsub
2658                            .add_explicit_peer(&peer_id);
2659                        // Announce HEAD to the newly connected peer
2660                        if let Some(ref head) = branch::resolve_head(&shard_dir)?.1 {
2661                            let (fc, ts) = commit_stats(&shard_dir, head).unwrap_or_default();
2662                            let ann = shard_net::protocol::Announcement {
2663                                commit_id: head.clone(),
2664                                file_count: fc,
2665                                total_size: ts,
2666                                repo_name: repo_name.clone(),
2667                                peer_multiaddr: format!("{}/p2p/{}", peer_multiaddr, node.local_peer_id()),
2668                            };
2669                            publish_ann(&mut node, &ann);
2670                        }
2671                    }
2672                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2673                        shard_net::p2p::ShardBehaviourEvent::Relay(event),
2674                    ) => {
2675                        info!("Relay event: {:?}", event);
2676                    }
2677                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2678                        shard_net::p2p::ShardBehaviourEvent::Dcutr(event),
2679                    ) => {
2680                        info!("DCUtR event: {:?}", event);
2681                    }
2682                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2683                        shard_net::p2p::ShardBehaviourEvent::Autonat(event),
2684                    ) => {
2685                        info!("AutoNAT event: {:?}", event);
2686                    }
2687                    shard_net::libp2p::swarm::SwarmEvent::IncomingConnection {
2688                        local_addr,
2689                        send_back_addr,
2690                        ..
2691                    } => {
2692                        info!(
2693                            "Incoming connection from {} to {}",
2694                            send_back_addr, local_addr
2695                        );
2696                    }
2697                    e => {
2698                        info!("Event: {:?}", e);
2699                    }
2700                }
2701            }
2702        }
2703    }
2704}
2705
2706pub async fn pull(path: &Path, peer: &str, commit_id: &str, json: bool) -> Result<()> {
2707    let shard_dir = path.join(".shard");
2708    // pull can work on empty repo or existing one.
2709
2710    if !shard_dir.exists() {
2711        init(path, "flat", "zstd", "fixed", None, false, false)?;
2712    }
2713
2714    let store = Store::open(&shard_dir)?;
2715    let cipher = maybe_load_cipher(&shard_dir)?;
2716
2717    let mut node = shard_net::p2p::Node::new().await?;
2718    let _ = node.listen("/ip4/0.0.0.0/tcp/0").await;
2719
2720    // Parse peer multiaddr
2721    let multiaddr: shard_net::libp2p::Multiaddr = peer.parse()?;
2722    let peer_id = match multiaddr.iter().last() {
2723        Some(shard_net::libp2p::multiaddr::Protocol::P2p(peer_id)) => peer_id,
2724        _ => anyhow::bail!("Multiaddr must end with /p2p/<peer_id>"),
2725    };
2726
2727    // 1. Get Commit (sequential — single request)
2728    if !json {
2729        info!("Pulling commit {} from {}...", commit_id, peer);
2730    }
2731    let commit_data = node
2732        .request_manifest(&multiaddr, peer_id, commit_id.to_string())
2733        .await?;
2734    let hash = blake3::hash(&commit_data);
2735    if hash.to_hex().to_string() != commit_id {
2736        anyhow::bail!("Commit hash mismatch");
2737    }
2738    let chunk = crate::chunker::Chunk {
2739        hash,
2740        data: commit_data.clone(),
2741        offset: 0,
2742    };
2743    store.put_chunk(&chunk)?;
2744
2745    let commit: Commit = metadata::deserialize(&commit_data)?;
2746    if !json {
2747        info!("Got commit: {}", commit.message);
2748    }
2749
2750    // Fetch key rotation records for this commit's key chain
2751    let keys_dir = shard_dir.join("keys");
2752    if let Some(kid) = &commit.key_id {
2753        if keys_dir.join("records").exists() {
2754            if let Ok(chain) = keychain::collect_rotation_chain(&keys_dir, kid) {
2755                let missing_rotations: Vec<&KeyRotation> = chain
2756                    .iter()
2757                    .filter(|r| store.get_chunk(&r.rotation_id).is_err())
2758                    .collect();
2759                if !missing_rotations.is_empty() {
2760                    if !json {
2761                        info!(
2762                            "Fetching {} key rotation records from peer...",
2763                            missing_rotations.len()
2764                        );
2765                    }
2766                    let rot_requests: Vec<(String, shard_net::protocol::ShardRequest)> =
2767                        missing_rotations
2768                            .iter()
2769                            .map(|r| {
2770                                (
2771                                    r.rotation_id.clone(),
2772                                    shard_net::protocol::ShardRequest::GetChunk(
2773                                        r.rotation_id.clone(),
2774                                    ),
2775                                )
2776                            })
2777                            .collect();
2778                    if let Ok(rot_results) = node
2779                        .request_parallel(&multiaddr, peer_id, rot_requests)
2780                        .await
2781                    {
2782                        for (rot_id, rot_data) in &rot_results {
2783                            let rh = blake3::hash(rot_data);
2784                            if rh.to_hex().to_string() != *rot_id {
2785                                info!("Key rotation record hash mismatch (expected {}, got {}) — skipping", rot_id, rh.to_hex());
2786                                continue;
2787                            }
2788                            store.put_chunk(&crate::chunker::Chunk {
2789                                hash: rh,
2790                                data: rot_data.clone(),
2791                                offset: 0,
2792                            })?;
2793                        }
2794                        if !json {
2795                            info!("Key rotation records synced from peer.");
2796                        }
2797                    }
2798                }
2799            }
2800        }
2801    }
2802
2803    // Set repo_id from commit's public key so clones share the gossipsub topic
2804    if let Some(pk_hex) = &commit.public_key {
2805        let pk_bytes = hex::decode(pk_hex)?;
2806        let repo_id = blake3::hash(&pk_bytes).to_hex().to_string();
2807        let mut config = load_config(&shard_dir)?;
2808        config.insert("repo_id".to_string(), repo_id);
2809        save_config(&shard_dir, &config)?;
2810    }
2811
2812    // 2. Fetch all manifests in parallel
2813    let manifest_requests: Vec<(String, shard_net::protocol::ShardRequest)> = commit
2814        .manifests
2815        .iter()
2816        .map(|id| {
2817            (
2818                id.clone(),
2819                shard_net::protocol::ShardRequest::GetManifest(id.clone()),
2820            )
2821        })
2822        .collect();
2823    let manifest_results = node
2824        .request_parallel(&multiaddr, peer_id, manifest_requests)
2825        .await?;
2826
2827    let mut all_chunk_ids: Vec<String> = Vec::new();
2828    let mut file_manifests: Vec<FileManifest> = Vec::new();
2829    // Map chunk_id -> compression type for verification in step 3
2830    let mut chunk_compression: HashMap<String, String> = HashMap::new();
2831
2832    for (manifest_id, manifest_data) in &manifest_results {
2833        let hash = blake3::hash(manifest_data);
2834        if hash.to_hex().to_string() != *manifest_id {
2835            anyhow::bail!("Manifest hash mismatch: {}", manifest_id);
2836        }
2837        let chunk = crate::chunker::Chunk {
2838            hash,
2839            data: manifest_data.clone(),
2840            offset: 0,
2841        };
2842        store.put_chunk(&chunk)?;
2843        let manifest: FileManifest = metadata::deserialize(manifest_data)?;
2844        if !json {
2845            info!(
2846                "Fetching file: {} (compression: {})",
2847                manifest.name, manifest.compression
2848            );
2849        }
2850        for cid in &manifest.chunks {
2851            chunk_compression.insert(cid.clone(), manifest.compression.clone());
2852        }
2853        all_chunk_ids.extend(manifest.chunks.clone());
2854        file_manifests.push(manifest);
2855    }
2856
2857    // 3. Resume: recover from partial directory if previous transfer was interrupted
2858    let partial = partial::PartialTransfer::new(&shard_dir, commit_id)?;
2859    let partial_chunks = partial.list_chunks()?;
2860    let mut recovered = 0usize;
2861    for chunk_id in &partial_chunks {
2862        if let Ok(data) = partial.load_chunk(chunk_id) {
2863            let hash = blake3::hash(&data);
2864            if hash.to_hex().to_string() == *chunk_id {
2865                // Recovered chunk matches — save to store
2866                let chunk = crate::chunker::Chunk {
2867                    hash,
2868                    data: data.clone(),
2869                    offset: 0,
2870                };
2871                if store.put_chunk(&chunk).is_ok() {
2872                    recovered += 1;
2873                }
2874            } else {
2875                // Corrupted partial chunk — remove and re-fetch
2876                let _ = partial.remove_chunk(chunk_id);
2877            }
2878        }
2879    }
2880    if recovered > 0 {
2881        info!("Recovered {} chunks from partial transfer", recovered);
2882    }
2883
2884    // 4. Fetch all missing chunks (not in store and not in partial)
2885    let needed_chunks: Vec<String> = all_chunk_ids
2886        .into_iter()
2887        .filter(|id| store.get_chunk(id).is_err())
2888        .collect();
2889
2890    if !needed_chunks.is_empty() {
2891        if !json {
2892            info!("Fetching {} chunks...", needed_chunks.len());
2893        }
2894        let chunk_requests: Vec<(String, shard_net::protocol::ShardRequest)> = needed_chunks
2895            .iter()
2896            .map(|id| {
2897                (
2898                    id.clone(),
2899                    shard_net::protocol::ShardRequest::GetChunk(id.clone()),
2900                )
2901            })
2902            .collect();
2903        let chunk_results = node
2904            .request_parallel(&multiaddr, peer_id, chunk_requests)
2905            .await?;
2906        for (chunk_id, chunk_data) in &chunk_results {
2907            // Determine compression from the manifest this chunk belongs to
2908            let compression: Compression = chunk_compression
2909                .get(chunk_id)
2910                .map(|s| s.as_str())
2911                .unwrap_or("none")
2912                .parse()?;
2913            // Decrypt (if private) then decompress to verify the content hash
2914            let decrypted = match &cipher {
2915                Some(c) => c.decrypt(chunk_data)?,
2916                None => chunk_data.clone(),
2917            };
2918            let decompressed = compression.decompress(&decrypted)?;
2919            let hash = blake3::hash(&decompressed);
2920            if hash.to_hex().to_string() != *chunk_id {
2921                anyhow::bail!("Chunk hash mismatch: {}", chunk_id);
2922            }
2923            // Store the data as received (encrypted for private repos)
2924            let chunk = crate::chunker::Chunk {
2925                hash,
2926                data: chunk_data.clone(),
2927                offset: 0,
2928            };
2929            store.put_chunk(&chunk)?;
2930            // Save to partial for resume support
2931            partial.save_chunk(chunk_id, chunk_data)?;
2932        }
2933    }
2934
2935    // 5. Reconstruct all files
2936    for manifest in &file_manifests {
2937        let compression = manifest.compression.parse::<Compression>()?;
2938        let mut file_data = Vec::new();
2939        for chunk_id in &manifest.chunks {
2940            let stored = store.get_chunk(chunk_id)?;
2941            let decrypted = match &cipher {
2942                Some(c) => c.decrypt(&stored)?,
2943                None => stored,
2944            };
2945            let decompressed = compression.decompress(&decrypted)?;
2946            file_data.extend_from_slice(&decompressed);
2947        }
2948        fs::write(path.join(&manifest.name), file_data)?;
2949        if !json {
2950            info!(
2951                "Reconstructed file: {} ({} bytes)",
2952                manifest.name, manifest.size
2953            );
2954        }
2955    }
2956
2957    // 6. Clean up partial transfer tracking
2958    partial.cleanup()?;
2959
2960    if json {
2961        info!(
2962            "{}",
2963            serde_json::to_string(&serde_json::json!({
2964                "status": "pull complete",
2965                "commit_id": commit_id,
2966            }))?
2967        );
2968    } else {
2969        info!("Pull complete.");
2970    }
2971    Ok(())
2972}
2973
2974pub fn transfer_list(path: &Path, json: bool) -> Result<()> {
2975    let shard_dir = path.join(".shard");
2976    if !shard_dir.exists() {
2977        anyhow::bail!("not a shard repository (run `shard init` first)");
2978    }
2979    let transfers = partial::list_incomplete_transfers(&shard_dir)?;
2980    if json {
2981        info!("{}", serde_json::to_string(&transfers)?);
2982    } else {
2983        if transfers.is_empty() {
2984            info!("No incomplete transfers.");
2985        } else {
2986            for t in &transfers {
2987                info!("Incomplete transfer: {}", t);
2988            }
2989        }
2990    }
2991    Ok(())
2992}
2993
2994pub fn transfer_remove(path: &Path, commit_id: &str) -> Result<()> {
2995    let shard_dir = path.join(".shard");
2996    if !shard_dir.exists() {
2997        anyhow::bail!("not a shard repository (run `shard init` first)");
2998    }
2999    partial::remove_transfer(&shard_dir, commit_id)?;
3000    info!("Removed transfer tracking for {}", commit_id);
3001    Ok(())
3002}
3003
3004pub async fn push(path: &Path, peer: &str, json: bool) -> Result<()> {
3005    let _guard = OP_QUEUES
3006        .get_or_create(&path.to_path_buf())
3007        .acquire(ops::OpKind::Write, "push".to_string());
3008    let shard_dir = path.join(".shard");
3009    if !shard_dir.exists() {
3010        anyhow::bail!("not a shard repository (run `shard init` first)");
3011    }
3012
3013    let (_, head_id) = branch::resolve_head(&shard_dir)?;
3014    let head_id = head_id.ok_or_else(|| anyhow::anyhow!("No commits to push"))?;
3015
3016    let store = Store::open(&shard_dir)?;
3017
3018    // Collect all reachable objects
3019    let mut objects: std::collections::BTreeMap<String, Vec<u8>> =
3020        std::collections::BTreeMap::new();
3021
3022    // Walk commits
3023    let mut seen = std::collections::HashSet::new();
3024    let mut stack = vec![head_id.clone()];
3025    while let Some(cid) = stack.pop() {
3026        if !seen.insert(cid.clone()) {
3027            continue;
3028        }
3029        if let Ok(data) = store.get_chunk(&cid) {
3030            objects.insert(cid, data.clone());
3031            if let Ok(commit) = metadata::deserialize::<Commit>(&data) {
3032                // Include key rotation records for this commit's key chain
3033                if let Some(kid) = &commit.key_id {
3034                    let keys_dir = shard_dir.join("keys");
3035                    if let Ok(chain) = keychain::collect_rotation_chain(&keys_dir, kid) {
3036                        for rot in &chain {
3037                            let rj = serde_json::to_vec(rot)?;
3038                            let rh = blake3::hash(&rj);
3039                            if !store.has_chunk(rh.to_hex().as_ref()) {
3040                                store.put_chunk(&crate::chunker::Chunk {
3041                                    hash: rh,
3042                                    data: rj.clone(),
3043                                    offset: 0,
3044                                })?;
3045                            }
3046                            objects.insert(rot.rotation_id.clone(), rj);
3047                        }
3048                    }
3049                }
3050                for mid in &commit.manifests {
3051                    if let Ok(manifest_data) = store.get_chunk(mid) {
3052                        objects.insert(mid.clone(), manifest_data.clone());
3053                        if let Ok(manifest) = metadata::deserialize::<FileManifest>(&manifest_data)
3054                        {
3055                            for cid in &manifest.chunks {
3056                                if let Ok(chunk_data) = store.get_chunk(cid) {
3057                                    objects.insert(cid.clone(), chunk_data);
3058                                }
3059                            }
3060                        }
3061                    }
3062                }
3063                for parent in &commit.parents {
3064                    stack.push(parent.clone());
3065                }
3066            }
3067        }
3068    }
3069
3070    if !json {
3071        info!(
3072            "Pushing {} objects ({} bytes)...",
3073            objects.len(),
3074            objects.values().map(|v| v.len() as u64).sum::<u64>()
3075        );
3076    }
3077
3078    // Connect and send all objects
3079    let mut node = shard_net::p2p::Node::new().await?;
3080    let _ = node.listen("/ip4/0.0.0.0/tcp/0").await;
3081    let multiaddr: shard_net::libp2p::Multiaddr = peer.parse()?;
3082    let peer_id = match multiaddr.iter().last() {
3083        Some(shard_net::libp2p::multiaddr::Protocol::P2p(peer_id)) => peer_id,
3084        _ => anyhow::bail!("Multiaddr must end with /p2p/<peer_id>"),
3085    };
3086
3087    for (id, data) in &objects {
3088        node.request_put_chunk(&multiaddr, peer_id, id.clone(), data.clone())
3089            .await?;
3090    }
3091
3092    if json {
3093        info!(
3094            "{}",
3095            serde_json::to_string(&serde_json::json!({
3096                "status": "push complete",
3097                "objects": objects.len(),
3098            }))?
3099        );
3100    } else {
3101        info!("Push complete ({} objects).", objects.len());
3102    }
3103    Ok(())
3104}
3105
3106/// Rotate the signing key: generates a new ed25519 keypair, archives the old
3107/// one, and persists a signed rotation record.
3108pub fn key_rotate(path: &Path) -> Result<()> {
3109    let shard_dir = path.join(".shard");
3110    if !shard_dir.exists() {
3111        anyhow::bail!("not a shard repository (run `shard init` first)");
3112    }
3113    let keys_dir = shard_dir.join("keys");
3114    let rotation = keychain::rotate_signing_key(&keys_dir)?;
3115
3116    // Also update global keyring
3117    if let Some(global) = global_keys_dir() {
3118        fs::create_dir_all(&global).ok();
3119        if let Ok(new_keys) = KeyPair::load(&keys_dir) {
3120            let _ = new_keys.save(&global);
3121        }
3122    }
3123
3124    // Store rotation record as a content-addressed chunk in the DAG
3125    let store = Store::open(&shard_dir)?;
3126    let json = serde_json::to_vec(&rotation)?;
3127    let hash = blake3::hash(&json);
3128    if !store.has_chunk(hash.to_hex().as_ref()) {
3129        store.put_chunk(&crate::chunker::Chunk {
3130            hash,
3131            data: json,
3132            offset: 0,
3133        })?;
3134    }
3135
3136    // Replicate all rotation records as chunks for P2P availability
3137    let rotations = keychain::load_rotations(&keys_dir)?;
3138    for rot in &rotations {
3139        let rj = serde_json::to_vec(rot)?;
3140        let rh = blake3::hash(&rj);
3141        if !store.has_chunk(rh.to_hex().as_ref()) {
3142            store.put_chunk(&crate::chunker::Chunk {
3143                hash: rh,
3144                data: rj,
3145                offset: 0,
3146            })?;
3147        }
3148    }
3149
3150    info!(
3151        "Key rotated: {} -> {}",
3152        rotation.old_key_id, rotation.new_key_id
3153    );
3154    info!("Rotation record: {} (stored in DAG)", rotation.rotation_id);
3155    Ok(())
3156}
3157
3158/// List all keys in the keychain with their validity info.
3159pub fn key_list(path: &Path, json: bool) -> Result<()> {
3160    let shard_dir = path.join(".shard");
3161    if !shard_dir.exists() {
3162        anyhow::bail!("not a shard repository (run `shard init` first)");
3163    }
3164    let keys_dir = shard_dir.join("keys");
3165    let records = keychain::load_records(&keys_dir)?;
3166    let current_id = keychain::get_current_key_id(&keys_dir)?;
3167
3168    if json {
3169        info!(
3170            "{}",
3171            serde_json::to_string_pretty(&serde_json::json!({
3172                "current": current_id,
3173                "records": &records,
3174            }))?
3175        );
3176    } else {
3177        info!("Current key: {}", current_id);
3178        info!("Key history:");
3179        for record in &records {
3180            let marker = if record.key_id == current_id {
3181                " (active)"
3182            } else {
3183                ""
3184            };
3185            info!(
3186                "  {}  created_at={}{}",
3187                record.key_id, record.created_at, marker
3188            );
3189            if let Some(prev) = &record.previous_key_id {
3190                info!("    previous: {}", prev);
3191            }
3192        }
3193    }
3194    Ok(())
3195}
3196
3197/// Verify the integrity of the keychain: check every rotation's signature.
3198pub fn key_verify(path: &Path, json: bool) -> Result<()> {
3199    let shard_dir = path.join(".shard");
3200    if !shard_dir.exists() {
3201        anyhow::bail!("not a shard repository (run `shard init` first)");
3202    }
3203    let keys_dir = shard_dir.join("keys");
3204    let errors = keychain::verify_keychain(&keys_dir)?;
3205
3206    if json {
3207        info!(
3208            "{}",
3209            serde_json::to_string(&serde_json::json!({
3210                "verified": errors.is_empty(),
3211                "errors": errors,
3212            }))?
3213        );
3214    } else {
3215        if errors.is_empty() {
3216            info!("Keychain verification successful.");
3217        } else {
3218            for err in &errors {
3219                error!("Keychain error: {}", err);
3220            }
3221            anyhow::bail!("Keychain verification failed ({} errors).", errors.len());
3222        }
3223    }
3224    Ok(())
3225}