Skip to main content

shard_core/
lib.rs

1pub mod branch;
2pub mod chunker;
3pub mod commit;
4pub mod compression;
5pub mod encryption;
6pub mod index;
7pub mod keychain;
8pub mod manifest;
9pub mod metadata;
10pub mod partial;
11pub mod store;
12pub mod wal;
13
14use crate::commit::Commit;
15use crate::compression::Compression;
16use crate::index::Index;
17use crate::keychain::KeyRotation;
18use crate::manifest::FileManifest;
19use crate::store::Store;
20use anyhow::Result;
21use ed25519_dalek::{Signer, Verifier};
22use metadata::MetadataFormat;
23use serde::Serialize;
24use shard_crypto::KeyPair;
25use shard_net::libp2p::futures::StreamExt;
26use similar::TextDiff;
27use std::collections::HashMap;
28use std::fs;
29use std::io::Write;
30use std::path::Path;
31use std::path::PathBuf;
32use std::time::{Duration, SystemTime, UNIX_EPOCH};
33use tracing::{error, info, warn};
34
35fn load_shardignore(path: &Path) -> Vec<String> {
36    let ignore_path = path.join(".shardignore");
37    if ignore_path.exists() {
38        fs::read_to_string(&ignore_path)
39            .unwrap_or_default()
40            .lines()
41            .filter(|l| !l.trim().is_empty() && !l.trim().starts_with('#'))
42            .map(|l| l.trim().to_string())
43            .collect()
44    } else {
45        Vec::new()
46    }
47}
48
49fn matches_ignore(path: &Path, patterns: &[String]) -> bool {
50    let path_str = path.to_string_lossy();
51    for pattern in patterns {
52        let pattern = pattern.trim_end_matches('/');
53        if pattern == "*" {
54            return true;
55        }
56        if let Some(glob) = pattern.strip_prefix("**/") {
57            if path_str.contains(glob) {
58                return true;
59            }
60        } else if path_str == pattern || path_str.ends_with(&format!("/{}", pattern)) {
61            return true;
62        }
63    }
64    false
65}
66
67pub fn init(
68    path: &Path,
69    backend: &str,
70    compression_algo: &str,
71    chunker_mode: &str,
72    chunk_size: Option<u64>,
73    is_private: bool,
74    json: bool,
75) -> Result<()> {
76    let shard_dir = path.join(".shard");
77    if shard_dir.exists() {
78        anyhow::bail!(
79            "repository already initialized at {} (run `shard status` to confirm)",
80            shard_dir.display()
81        );
82    }
83    fs::create_dir_all(shard_dir.join("objects"))?;
84    fs::create_dir_all(shard_dir.join("keys"))?;
85    fs::create_dir_all(shard_dir.join("refs").join("heads"))?;
86    branch::set_head_branch(&shard_dir, "main")?;
87
88    let keys = KeyPair::generate();
89    keys.save(&shard_dir.join("keys"))?;
90    if let Some(global) = global_keys_dir() {
91        fs::create_dir_all(&global).ok();
92        let _ = keys.save(&global);
93        let _ = keychain::init_keychain(&global);
94    }
95    keychain::init_keychain(&shard_dir.join("keys"))?;
96
97    // Generate a deterministic repo identity from the public key
98    // (same key = same repo_id, so clones share the gossipsub topic)
99    let pubkey = fs::read(shard_dir.join("keys/public.key"))?;
100    let repo_id = blake3::hash(&pubkey).to_hex().to_string();
101    let mut config = load_config(&shard_dir)?;
102
103    if is_private {
104        let key = encryption::generate_repo_key();
105        encryption::save_repo_key(&shard_dir.join("keys"), &key)?;
106        config.insert("private".to_string(), "true".to_string());
107    }
108    config.insert("repo_id".to_string(), repo_id);
109    config.insert("storage_backend".to_string(), backend.to_string());
110    config.insert(
111        "serialization_format".to_string(),
112        MetadataFormat::Json.config_value().to_string(),
113    );
114    config.insert("compression".to_string(), compression_algo.to_string());
115    config.insert("chunker_mode".to_string(), chunker_mode.to_string());
116    match chunker_mode {
117        "rabin" => {
118            let chunk_size = chunk_size.unwrap_or(4_194_304);
119            let min = chunk_size / 4;
120            let max = chunk_size * 2;
121            config.insert("chunk_min".to_string(), min.to_string());
122            config.insert("chunk_avg".to_string(), chunk_size.to_string());
123            config.insert("chunk_max".to_string(), max.to_string());
124        }
125        _ => {
126            let cs = chunk_size.unwrap_or(4_194_304);
127            config.insert("chunk_size".to_string(), cs.to_string());
128        }
129    }
130    save_config(&shard_dir, &config)?;
131
132    let chunker_desc = if chunker_mode == "rabin" {
133        format!(
134            "rabin (avg {} bytes)",
135            config.get("chunk_avg").unwrap_or(&"4 MiB".to_string())
136        )
137    } else {
138        format!(
139            "fixed ({} bytes)",
140            config.get("chunk_size").unwrap_or(&"4 MiB".to_string())
141        )
142    };
143    if json {
144        info!(
145            "{}",
146            serde_json::to_string(&serde_json::json!({
147                "path": shard_dir.display().to_string(),
148                "backend": backend,
149                "compression": compression_algo,
150                "chunker": chunker_desc,
151                "private": is_private,
152            }))?
153        );
154    } else {
155        info!(
156            "Initialized empty Shard repository in {} with {} storage (compression: {}, chunking: {})",
157            shard_dir.display(),
158            backend,
159            compression_algo,
160            chunker_desc,
161        );
162    }
163    Ok(())
164}
165
166fn relative_path(repo_root: &Path, file_path: &Path) -> String {
167    let repo = repo_root
168        .canonicalize()
169        .unwrap_or_else(|_| repo_root.to_path_buf());
170    let file = file_path
171        .canonicalize()
172        .unwrap_or_else(|_| file_path.to_path_buf());
173    file.strip_prefix(&repo)
174        .map(|p| p.to_string_lossy().to_string())
175        .unwrap_or_else(|_| {
176            file_path
177                .file_name()
178                .map(|s| s.to_string_lossy().to_string())
179                .unwrap_or_default()
180        })
181}
182
183fn detect_content_type(file_path: &Path) -> Option<String> {
184    let ext = file_path.extension()?.to_str()?.to_lowercase();
185    let mime = match ext.as_str() {
186        "txt" => "text/plain",
187        "json" => "application/json",
188        "csv" => "text/csv",
189        "png" => "image/png",
190        "jpg" | "jpeg" => "image/jpeg",
191        "gif" => "image/gif",
192        "pdf" => "application/pdf",
193        "yaml" | "yml" => "application/x-yaml",
194        "md" => "text/markdown",
195        "html" | "htm" => "text/html",
196        "py" => "text/x-python",
197        "rs" => "text/x-rust",
198        "ts" => "text/x-typescript",
199        "js" => "application/javascript",
200        "wasm" => "application/wasm",
201        "toml" => "application/toml",
202        "xml" => "application/xml",
203        "zip" => "application/zip",
204        "tar" => "application/x-tar",
205        "gz" => "application/gzip",
206        "bin" => "application/octet-stream",
207        "pt" | "pth" | "ckpt" | "safetensors" => "application/x-model",
208        _ => return None,
209    };
210    Some(mime.to_string())
211}
212
213#[allow(clippy::too_many_arguments)]
214fn add_file(
215    repo_root: &Path,
216    file_path: &Path,
217    store: &Store,
218    index: &mut Index,
219    compression: &Compression,
220    chunker_mode: &chunker::ChunkerMode,
221    cipher: Option<&encryption::RepoCipher>,
222    _json: bool,
223) -> Result<()> {
224    let file = fs::File::open(file_path)?;
225    let mut chunker = match chunker_mode {
226        chunker::ChunkerMode::Fixed { chunk_size } => {
227            chunker::Chunker::new_fixed(Box::new(file), *chunk_size)
228        }
229        chunker::ChunkerMode::Rabin { min, avg, max } => {
230            chunker::Chunker::new_rabin(Box::new(file), *min, *avg, *max)
231        }
232    };
233    let mut chunk_hashes = Vec::new();
234    let mut total_size = 0;
235
236    while let Some(chunk) = chunker.next_chunk()? {
237        let hash = chunk.hash;
238        let compressed_data = compression.compress(&chunk.data)?;
239        let stored_data = match cipher {
240            Some(c) => c.encrypt(&compressed_data),
241            None => compressed_data,
242        };
243        let stored = crate::chunker::Chunk {
244            hash,
245            data: stored_data,
246            offset: chunk.offset,
247        };
248        store.put_chunk(&stored)?;
249        chunk_hashes.push(hash.to_hex().to_string());
250        total_size += chunk.data.len() as u64;
251    }
252
253    let name = relative_path(repo_root, file_path);
254    let manifest = FileManifest {
255        name: name.clone(),
256        size: total_size,
257        chunks: chunk_hashes.clone(),
258        content_type: detect_content_type(file_path),
259        compression: compression.as_str().to_string(),
260        merkle_root: Some(FileManifest::merkle_root(&chunk_hashes)),
261        created_by: None,
262        created_at: None,
263        signature: None,
264    };
265
266    index.files.insert(name.clone(), manifest);
267    if !_json {
268        info!("Added {} ({})", name, total_size);
269    }
270    Ok(())
271}
272
273pub fn recover(path: &Path, json: bool) -> Result<()> {
274    let shard_dir = path.join(".shard");
275    if !shard_dir.exists() {
276        anyhow::bail!("not a shard repository (run `shard init` first)");
277    }
278    wal::recover(&shard_dir)?;
279    if json {
280        info!(
281            "{}",
282            serde_json::to_string(&serde_json::json!({"status": "recovery complete"}))?
283        );
284    } else {
285        info!("Recovery complete.");
286    }
287    Ok(())
288}
289
290pub fn add(path: &Path, file_path: &Path, json: bool) -> Result<()> {
291    let shard_dir = path.join(".shard");
292    if !shard_dir.exists() {
293        anyhow::bail!("not a shard repository (run `shard init` first)");
294    }
295
296    wal::recover(&shard_dir)?;
297
298    let config = load_config(&shard_dir)?;
299    let compression: Compression = config
300        .get("compression")
301        .map(|s| s.as_str())
302        .unwrap_or("zstd")
303        .parse()?;
304
305    let chunker_mode = chunker::ChunkerMode::from_config(&config);
306    let fmt = MetadataFormat::from_config(&config);
307
308    let store = Store::open(&shard_dir)?;
309    let mut index = Index::load(&shard_dir.join("index"), &fmt)?;
310
311    let cipher = maybe_load_cipher(&shard_dir)?;
312    let ignore_patterns = load_shardignore(path);
313
314    if file_path.is_dir() {
315        for entry in walkdir::WalkDir::new(file_path)
316            .into_iter()
317            .filter_entry(|e| {
318                let name = e.file_name().to_string_lossy();
319                if name == ".shard" || name == ".git" {
320                    return false;
321                }
322                let rel = e.path().strip_prefix(file_path).unwrap_or(e.path());
323                if matches_ignore(rel, &ignore_patterns) {
324                    return false;
325                }
326                true
327            })
328        {
329            let entry = entry?;
330            if entry.file_type().is_file() {
331                add_file(
332                    path,
333                    entry.path(),
334                    &store,
335                    &mut index,
336                    &compression,
337                    &chunker_mode,
338                    cipher.as_ref(),
339                    json,
340                )?;
341            }
342        }
343    } else {
344        add_file(
345            path,
346            file_path,
347            &store,
348            &mut index,
349            &compression,
350            &chunker_mode,
351            cipher.as_ref(),
352            json,
353        )?;
354    }
355
356    index.save(&shard_dir.join("index"), &fmt)?;
357    if json {
358        info!(
359            "{}",
360            serde_json::to_string(&serde_json::json!({"status": "added"}))?
361        );
362    }
363    Ok(())
364}
365
366pub fn commit(path: &Path, message: &str, author: &str, json: bool) -> Result<()> {
367    let shard_dir = path.join(".shard");
368    if !shard_dir.exists() {
369        anyhow::bail!("not a shard repository (run `shard init` first)");
370    }
371
372    // Recover from any previous crash before mutating
373    wal::recover(&shard_dir)?;
374
375    let config = load_config(&shard_dir)?;
376    let fmt = MetadataFormat::from_config(&config);
377
378    let store = Store::open(&shard_dir)?;
379    let mut index = Index::load(&shard_dir.join("index"), &fmt)?;
380
381    if index.files.is_empty() {
382        anyhow::bail!("nothing to commit (stage files with `shard add` first)");
383    }
384
385    let head_path = shard_dir.join("HEAD");
386
387    // WAL: back up pre-commit state
388    let wal = wal::Wal::new(&shard_dir);
389    let head_backup = fs::read_to_string(&head_path).ok();
390    let index_backup = fs::read(shard_dir.join("index"))?;
391    wal.append(&wal::WalEntry::CommitBegin {
392        head_backup,
393        index_backup,
394    })?;
395
396    // 1. Store manifests (signed)
397    let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
398    let keys = load_keypair(&shard_dir)?;
399    let signing_key = keys.signing_key;
400    let mut manifest_ids = Vec::new();
401    for manifest in index.files.values_mut() {
402        manifest.created_by = Some(author.to_string());
403        manifest.created_at = Some(timestamp);
404
405        let mut unsigned = manifest.clone();
406        unsigned.signature = None;
407        let canonical = metadata::serialize_for_signing(&unsigned);
408        let sig = signing_key.sign(&canonical);
409        manifest.signature = Some(hex::encode(sig.to_bytes()));
410
411        let encoded = metadata::serialize(manifest, &fmt);
412        let hash = blake3::hash(&encoded);
413        let chunk = crate::chunker::Chunk {
414            hash,
415            data: encoded,
416            offset: 0,
417        };
418        store.put_chunk(&chunk)?;
419        manifest_ids.push(hash.to_hex().to_string());
420    }
421    manifest_ids.sort();
422
423    // 2. Get parent
424    let mut parents = Vec::new();
425    let (current_branch, parent_id) = branch::resolve_head(&shard_dir)?;
426    if let Some(pid) = parent_id {
427        parents.push(pid);
428    }
429
430    // 3. Create commit
431    let public_key_hex = hex::encode(keys.verifying_key.to_bytes());
432    let key_id = keychain::get_current_key_id(&shard_dir.join("keys")).ok();
433    let mut commit = Commit {
434        commit_id: String::new(),
435        parents,
436        manifests: manifest_ids,
437        author: author.to_string(),
438        message: message.to_string(),
439        timestamp,
440        public_key: Some(public_key_hex),
441        signature: None,
442        key_id,
443    };
444
445    // 4. Sign — always JSON for deterministic signature
446    let json_unsigned = metadata::serialize_for_signing(&commit);
447    let signature = signing_key.sign(&json_unsigned);
448    commit.signature = Some(hex::encode(signature.to_bytes()));
449
450    // 5. Store commit — use configured format
451    let encoded = metadata::serialize(&commit, &fmt);
452    let hash = blake3::hash(&encoded);
453    let chunk = crate::chunker::Chunk {
454        hash,
455        data: encoded,
456        offset: 0,
457    };
458    store.put_chunk(&chunk)?;
459
460    // 6. Cycle detection: verify no parent chain already contains this commit
461    let commit_id = hash.to_hex().to_string();
462    if has_dag_cycle(&store, &commit.parents, &commit_id)? {
463        anyhow::bail!(
464            "Cycle detected: commit {} is already an ancestor of one or more parents",
465            commit_id
466        );
467    }
468
469    // 7. Update HEAD and branch ref
470    if let Some(ref branch_name) = current_branch {
471        branch::update_branch_ref(&shard_dir, branch_name, &commit_id)?;
472        branch::set_head_branch(&shard_dir, branch_name)?;
473    } else {
474        fs::write(&head_path, &commit_id)?;
475    }
476
477    // 7. Clear index
478    index.files.clear();
479    index.save(&shard_dir.join("index"), &fmt)?;
480
481    // WAL: mark commit complete
482    wal.append(&wal::WalEntry::CommitEnd)?;
483    wal.truncate()?;
484
485    if json {
486        info!(
487            "{}",
488            serde_json::to_string(&serde_json::json!({
489                "commit_id": commit_id,
490                "message": message,
491            }))?
492        );
493    } else {
494        info!("Committed {} ({})", commit_id, message);
495    }
496    Ok(())
497}
498
499pub fn verify(path: &Path, commit_id: &str, json: bool) -> Result<()> {
500    let shard_dir = path.join(".shard");
501    if !shard_dir.exists() {
502        anyhow::bail!("not a shard repository (run `shard init` first)");
503    }
504
505    if commit_id.len() < 2 {
506        anyhow::bail!("invalid commit id (too short: need at least 2 characters)");
507    }
508    let store = Store::open(&shard_dir)?;
509    let cipher = maybe_load_cipher(&shard_dir)?;
510    let commit_data = store.get_chunk(commit_id)?;
511
512    // Self-verify: stored blob hash must equal commit_id (M5)
513    let stored_hash = blake3::hash(&commit_data);
514    if stored_hash.to_hex().to_string() != commit_id {
515        anyhow::bail!("commit object hash mismatch: stored content does not match its hash — data may be corrupted");
516    }
517
518    let commit: Commit = metadata::deserialize(&commit_data)?;
519
520    let mut sig_verified = false;
521    let mut files_checked = 0u64;
522
523    // Verify the signing key was valid at commit time
524    if let Some(kid) = &commit.key_id {
525        if let Err(e) = keychain::key_was_valid_at(&shard_dir.join("keys"), kid, commit.timestamp) {
526            anyhow::bail!("Keychain verification failed: {}", e);
527        } else if !json {
528            info!("Keychain: key {} was active at commit time.", kid);
529        }
530    }
531
532    if let Some(sig_hex) = &commit.signature {
533        let verifying_key = if let Some(pk_hex) = &commit.public_key {
534            let pk_bytes = hex::decode(pk_hex)?;
535            ed25519_dalek::VerifyingKey::from_bytes(pk_bytes.as_slice().try_into()?)?
536        } else {
537            let pub_bytes = load_public_key(&shard_dir)?;
538            ed25519_dalek::VerifyingKey::from_bytes(pub_bytes.as_slice().try_into()?)?
539        };
540
541        let mut unsigned_commit = commit.clone();
542        unsigned_commit.signature = None;
543        let json_unsigned = metadata::serialize_for_signing(&unsigned_commit);
544
545        let sig_bytes = hex::decode(sig_hex)?;
546        let signature = ed25519_dalek::Signature::from_bytes(sig_bytes.as_slice().try_into()?);
547
548        verifying_key.verify(&json_unsigned, &signature)?;
549        sig_verified = true;
550        if !json {
551            info!("Signature verified.");
552        }
553    } else if !json {
554        info!("Warning: Commit is unsigned.");
555    }
556
557    for manifest_id in &commit.manifests {
558        let manifest_data = store.get_chunk(manifest_id)?;
559        let hash = blake3::hash(&manifest_data);
560        if hash.to_hex().to_string() != *manifest_id {
561            anyhow::bail!("manifest object hash mismatch for manifest '{}': content does not match stored hash. The object store may be corrupted.", manifest_id);
562        }
563
564        let manifest: FileManifest = metadata::deserialize(&manifest_data)?;
565
566        // Verify manifest signature (defense-in-depth; commit signature already covers manifest_id)
567        if let Some(sig_hex) = &manifest.signature {
568            let pk_bytes = if let Some(pk_hex) = &commit.public_key {
569                hex::decode(pk_hex)?
570            } else {
571                load_public_key(&shard_dir)?
572            };
573            let vk = ed25519_dalek::VerifyingKey::from_bytes(pk_bytes.as_slice().try_into()?)?;
574            let mut unsigned = manifest.clone();
575            unsigned.signature = None;
576            let canonical = metadata::serialize_for_signing(&unsigned);
577            let sig_bytes = hex::decode(sig_hex)?;
578            let sig = ed25519_dalek::Signature::from_bytes(sig_bytes.as_slice().try_into()?);
579            vk.verify(&canonical, &sig)?;
580            if !json {
581                info!("  Manifest signature verified for: {}", manifest.name);
582            }
583        }
584
585        let compression = manifest.compression.parse::<Compression>()?;
586        if !json {
587            info!(
588                "Verifying file: {} (compression: {})",
589                manifest.name, manifest.compression
590            );
591        }
592
593        // Verify merkle_root if present
594        if let Some(ref mr) = manifest.merkle_root {
595            let computed = FileManifest::merkle_root(&manifest.chunks);
596            if mr != &computed {
597                anyhow::bail!(
598                    "merkle root mismatch for '{}': manifest says {} but computed {}",
599                    manifest.name,
600                    mr,
601                    computed
602                );
603            }
604        }
605
606        for chunk_id in &manifest.chunks {
607            let chunk_data = store.get_chunk(chunk_id)?;
608            let decrypted = match &cipher {
609                Some(c) => c.decrypt(&chunk_data)?,
610                None => chunk_data,
611            };
612            let decompressed = compression.decompress(&decrypted)?;
613            let hash = blake3::hash(&decompressed);
614            if hash.to_hex().to_string() != *chunk_id {
615                anyhow::bail!("chunk hash mismatch for '{}': content does not match stored hash (expected {}, got {}). File may be corrupted.", manifest.name, chunk_id, hash.to_hex());
616            }
617        }
618        files_checked += 1;
619    }
620
621    if json {
622        info!(
623            "{}",
624            serde_json::to_string(&serde_json::json!({
625                "commit_id": commit_id,
626                "verified": true,
627                "signature_verified": sig_verified,
628                "files_checked": files_checked,
629            }))?
630        );
631    } else {
632        info!("Verification successful.");
633    }
634    Ok(())
635}
636
637fn load_commit(store: &Store, commit_id: &str) -> Result<Commit> {
638    if commit_id.len() < 2 {
639        anyhow::bail!(
640            "commit id too short (got {} chars, need at least 2): '{}'",
641            commit_id.len(),
642            commit_id
643        );
644    };
645    let data = store.get_chunk(commit_id)?;
646    let mut commit: Commit = metadata::deserialize(&data)?;
647    commit.commit_id = commit_id.to_string();
648    Ok(commit)
649}
650
651fn has_dag_cycle(store: &Store, parents: &[String], commit_id: &str) -> Result<bool> {
652    let mut seen = std::collections::HashSet::new();
653    let mut stack: Vec<String> = parents.to_vec();
654    while let Some(cid) = stack.pop() {
655        if cid == commit_id {
656            return Ok(true);
657        }
658        if !seen.insert(cid.clone()) {
659            continue;
660        }
661        if let Ok(data) = store.get_chunk(&cid) {
662            if let Ok(commit) = metadata::deserialize::<Commit>(&data) {
663                for p in &commit.parents {
664                    stack.push(p.clone());
665                }
666            }
667        }
668    }
669    Ok(false)
670}
671
672#[derive(Serialize)]
673pub struct LogEntry {
674    pub commit_id: String,
675    pub parents: Vec<String>,
676    pub manifests: Vec<String>,
677    pub author: String,
678    pub message: String,
679    pub timestamp: u64,
680    pub signature: Option<String>,
681}
682
683impl From<Commit> for LogEntry {
684    fn from(c: Commit) -> Self {
685        LogEntry {
686            commit_id: c.commit_id,
687            parents: c.parents,
688            manifests: c.manifests,
689            author: c.author,
690            message: c.message,
691            timestamp: c.timestamp,
692            signature: c.signature,
693        }
694    }
695}
696
697pub fn log_cmd(path: &Path, json: bool) -> Result<()> {
698    let shard_dir = path.join(".shard");
699    if !shard_dir.exists() {
700        anyhow::bail!("not a shard repository (run `shard init` first)");
701    }
702
703    let store = Store::open(&shard_dir)?;
704
705    let (_, head_commit) = branch::resolve_head(&shard_dir)?;
706    let head = head_commit
707        .ok_or_else(|| anyhow::anyhow!("no commits yet (run `shard commit` after adding files)"))?;
708
709    let mut entries: Vec<LogEntry> = Vec::new();
710    let mut seen = std::collections::HashSet::new();
711    let mut stack = vec![head];
712
713    while let Some(cid) = stack.pop() {
714        if !seen.insert(cid.clone()) {
715            continue;
716        }
717        let commit = load_commit(&store, &cid)?;
718        for parent in &commit.parents {
719            stack.push(parent.clone());
720        }
721        entries.push(commit.into());
722    }
723
724    if json {
725        info!("{}", serde_json::to_string_pretty(&entries)?);
726    } else {
727        for entry in &entries {
728            let ts = {
729                let secs = entry.timestamp as i64;
730                let tm = time::OffsetDateTime::from_unix_timestamp(secs)
731                    .unwrap_or(time::OffsetDateTime::UNIX_EPOCH);
732                tm.format(&time::format_description::well_known::Rfc3339)
733                    .unwrap_or_else(|_| entry.timestamp.to_string())
734            };
735            info!("commit {}", entry.commit_id);
736            if !entry.parents.is_empty() {
737                info!("parents: {}", entry.parents.join(" "));
738            }
739            info!("author: {}", entry.author);
740            info!("date:   {}", ts);
741            info!("");
742            for line in entry.message.lines() {
743                info!("    {}", line);
744            }
745            info!("");
746        }
747    }
748
749    Ok(())
750}
751
752fn reconstruct_file(
753    store: &Store,
754    manifest: &FileManifest,
755    cipher: Option<&encryption::RepoCipher>,
756) -> Result<Vec<u8>> {
757    let compression: Compression = manifest.compression.parse()?;
758    let mut data = Vec::new();
759    for chunk_id in &manifest.chunks {
760        let chunk_data = store.get_chunk(chunk_id)?;
761        let decrypted = match cipher {
762            Some(c) => c.decrypt(&chunk_data)?,
763            None => chunk_data,
764        };
765        let decompressed = compression.decompress(&decrypted)?;
766        data.extend_from_slice(&decompressed);
767    }
768    Ok(data)
769}
770
771pub fn diff(path: &Path, commit_a: &str, commit_b: Option<&str>, json: bool) -> Result<()> {
772    let shard_dir = path.join(".shard");
773    if !shard_dir.exists() {
774        anyhow::bail!("not a shard repository (run `shard init` first)");
775    }
776
777    let store = Store::open(&shard_dir)?;
778    let cipher = maybe_load_cipher(&shard_dir)?;
779
780    let cid_b = match commit_b {
781        Some(c) => branch::resolve_rev(&shard_dir, c)?,
782        None => {
783            let (_, head) = branch::resolve_head(&shard_dir)?;
784            head.ok_or_else(|| anyhow::anyhow!("no commits yet"))?
785        }
786    };
787    let cid_a = branch::resolve_rev(&shard_dir, commit_a)?;
788
789    let commit1 = load_commit(&store, &cid_a)?;
790    let commit2 = load_commit(&store, &cid_b)?;
791
792    let mut files1: HashMap<String, FileManifest> = HashMap::new();
793    for mid in &commit1.manifests {
794        let data = store.get_chunk(mid)?;
795        let m: FileManifest = metadata::deserialize(&data)?;
796        files1.insert(m.name.clone(), m);
797    }
798
799    let mut files2: HashMap<String, FileManifest> = HashMap::new();
800    for mid in &commit2.manifests {
801        let data = store.get_chunk(mid)?;
802        let m: FileManifest = metadata::deserialize(&data)?;
803        files2.insert(m.name.clone(), m);
804    }
805
806    let mut all_names: Vec<&String> = files1.keys().chain(files2.keys()).collect();
807    all_names.sort();
808    all_names.dedup();
809
810    let mut changes: Vec<serde_json::Value> = Vec::new();
811    let mut diff_found = false;
812
813    for name in all_names {
814        match (files1.get(name), files2.get(name)) {
815            (None, Some(manifest)) => {
816                let content = reconstruct_file(&store, manifest, cipher.as_ref())?;
817                let text = String::from_utf8_lossy(&content);
818                diff_found = true;
819                if json {
820                    changes.push(serde_json::json!({
821                        "type": "added",
822                        "file": name,
823                        "lines": text.lines().collect::<Vec<_>>(),
824                    }));
825                } else {
826                    info!("--- /dev/null");
827                    info!("+++ b/{}", name);
828                    let lines: Vec<&str> = text.lines().collect();
829                    info!("@@ -0,0 +1,{} @@", lines.len());
830                    for line in &lines {
831                        info!("+{}", line);
832                    }
833                }
834            }
835            (Some(manifest), None) => {
836                let content = reconstruct_file(&store, manifest, cipher.as_ref())?;
837                let text = String::from_utf8_lossy(&content);
838                diff_found = true;
839                if json {
840                    changes.push(serde_json::json!({
841                        "type": "removed",
842                        "file": name,
843                        "lines": text.lines().collect::<Vec<_>>(),
844                    }));
845                } else {
846                    info!("--- a/{}", name);
847                    info!("+++ /dev/null");
848                    let lines: Vec<&str> = text.lines().collect();
849                    info!("@@ -1,{} +0,0 @@", lines.len());
850                    for line in &lines {
851                        info!("-{}", line);
852                    }
853                }
854            }
855            (Some(ma), Some(mb)) => {
856                if ma.chunks == mb.chunks {
857                    continue;
858                }
859                let content_a = reconstruct_file(&store, ma, cipher.as_ref())?;
860                let content_b = reconstruct_file(&store, mb, cipher.as_ref())?;
861                diff_found = true;
862                if json {
863                    let text_a = String::from_utf8_lossy(&content_a);
864                    let text_b = String::from_utf8_lossy(&content_b);
865                    changes.push(serde_json::json!({
866                        "type": "modified",
867                        "file": name,
868                        "old_lines": text_a.lines().collect::<Vec<_>>(),
869                        "new_lines": text_b.lines().collect::<Vec<_>>(),
870                    }));
871                } else {
872                    let text_a = String::from_utf8_lossy(&content_a);
873                    let text_b = String::from_utf8_lossy(&content_b);
874                    let diff = TextDiff::from_lines(text_a.as_ref(), text_b.as_ref());
875                    let mut buf: Vec<u8> = Vec::new();
876                    diff.unified_diff()
877                        .header(&format!("a/{}", name), &format!("b/{}", name))
878                        .to_writer(&mut buf)
879                        .map_err(|e| anyhow::anyhow!("diff output error: {}", e))?;
880                    let output = String::from_utf8_lossy(&buf);
881                    for line in output.lines() {
882                        info!("{}", line);
883                    }
884                }
885            }
886            (None, None) => {}
887        }
888    }
889
890    if !diff_found {
891        if json {
892            info!(
893                "{}",
894                serde_json::to_string(
895                    &serde_json::json!({"changes": changes, "message": "no differences"})
896                )?
897            );
898        } else {
899            info!("No differences between the commits.");
900        }
901        return Ok(());
902    }
903
904    if json {
905        info!(
906            "{}",
907            serde_json::to_string(&serde_json::json!({"changes": changes}))?
908        );
909    }
910
911    Ok(())
912}
913
914pub fn checkout(path: &Path, target: &str, json: bool) -> Result<()> {
915    let shard_dir = path.join(".shard");
916    if !shard_dir.exists() {
917        anyhow::bail!("not a shard repository (run `shard init` first)");
918    }
919
920    let store = Store::open(&shard_dir)?;
921    let cipher = maybe_load_cipher(&shard_dir)?;
922
923    // Resolve target: branch name or commit id
924    // Validate BEFORE writing to HEAD to avoid corruption on failure
925    let branch_path = shard_dir.join("refs").join("heads").join(target);
926    let commit_id = if branch_path.exists() {
927        fs::read_to_string(&branch_path)?.trim().to_string()
928    } else {
929        target.to_string()
930    };
931
932    // Validate commit exists before touching HEAD
933    let commit = load_commit(&store, &commit_id)?;
934
935    // Only update HEAD after validation succeeds
936    if branch_path.exists() {
937        branch::set_head_branch(&shard_dir, target)?;
938    } else {
939        branch::set_head_commit(&shard_dir, target)?;
940    }
941    let mut files = Vec::new();
942
943    for manifest_id in &commit.manifests {
944        let data = store.get_chunk(manifest_id)?;
945        let hash = blake3::hash(&data);
946        if hash.to_hex().to_string() != *manifest_id {
947            anyhow::bail!("Manifest hash mismatch: {}", manifest_id);
948        }
949        let manifest: FileManifest = metadata::deserialize(&data)?;
950        let compression = manifest.compression.parse::<Compression>()?;
951        if !json {
952            info!(
953                "Checking out file: {} (compression: {})",
954                manifest.name, manifest.compression
955            );
956        }
957
958        let mut file_data = Vec::new();
959        for chunk_id in &manifest.chunks {
960            let chunk_data = store.get_chunk(chunk_id)?;
961            let decrypted = match &cipher {
962                Some(c) => c.decrypt(&chunk_data)?,
963                None => chunk_data,
964            };
965            let decompressed = compression.decompress(&decrypted)?;
966            file_data.extend_from_slice(&decompressed);
967        }
968        fs::write(path.join(&manifest.name), file_data)?;
969        if !json {
970            info!("  -> {}", manifest.name);
971        }
972        files.push(manifest.name);
973    }
974
975    if json {
976        info!(
977            "{}",
978            serde_json::to_string(&serde_json::json!({
979                "commit_id": commit_id,
980                "files": files,
981            }))?
982        );
983    } else {
984        info!("Checkout complete.");
985    }
986    Ok(())
987}
988
989pub fn status(path: &Path, json: bool) -> Result<()> {
990    let shard_dir = path.join(".shard");
991    if !shard_dir.exists() {
992        anyhow::bail!("not a shard repository (run `shard init` first)");
993    }
994
995    let config = load_config(&shard_dir)?;
996    let fmt = MetadataFormat::from_config(&config);
997
998    let (current_branch, head_commit) = branch::resolve_head(&shard_dir)?;
999    let mut commit_id: Option<String> = None;
1000    if let Some(cid) = head_commit {
1001        commit_id = Some(cid);
1002        if !json {
1003            if let Some(ref branch) = current_branch {
1004                info!("On branch: {}", branch);
1005            } else {
1006                info!("HEAD detached at {}", commit_id.as_ref().unwrap());
1007            }
1008        }
1009    } else if !json {
1010        info!("No commits yet.");
1011    }
1012
1013    let index = Index::load(&shard_dir.join("index"), &fmt)?;
1014    let staged: Vec<String> = index.files.keys().cloned().collect();
1015    if !json {
1016        if staged.is_empty() {
1017            info!("Nothing staged.");
1018        } else {
1019            info!("\nStaged files:");
1020            for name in &staged {
1021                info!("  {} (to be committed)", name);
1022            }
1023        }
1024    }
1025
1026    let store = Store::open(&shard_dir)?;
1027    let mut deleted = Vec::new();
1028    let tracked_names: std::collections::HashSet<String> = if let Some(head) = &commit_id {
1029        let mut names = std::collections::HashSet::new();
1030        if let Ok(commit) = load_commit(&store, head) {
1031            for manifest_id in &commit.manifests {
1032                if let Ok(data) = store.get_chunk(manifest_id) {
1033                    if let Ok(manifest) = metadata::deserialize::<FileManifest>(&data) {
1034                        let file_path = path.join(&manifest.name);
1035                        if !file_path.exists() {
1036                            deleted.push(manifest.name.clone());
1037                        }
1038                        names.insert(manifest.name);
1039                    }
1040                }
1041            }
1042        }
1043        names
1044    } else {
1045        std::collections::HashSet::new()
1046    };
1047
1048    if !json && !deleted.is_empty() {
1049        info!("\nDeleted files:");
1050        for name in &deleted {
1051            info!("  {} (deleted)", name);
1052        }
1053    }
1054
1055    let mut untracked = Vec::new();
1056    let ignore_patterns = load_shardignore(path);
1057    for entry in walkdir::WalkDir::new(path)
1058        .min_depth(1)
1059        .into_iter()
1060        .filter_map(|e| e.ok())
1061    {
1062        if entry.file_type().is_file() || entry.file_type().is_dir() {
1063            let rel_path = entry.path().strip_prefix(path).unwrap_or(entry.path());
1064            let name = rel_path.to_string_lossy().to_string();
1065            let is_hidden = rel_path.components().any(|c| {
1066                let name = c.as_os_str().to_string_lossy();
1067                name == ".shard" || name == ".git"
1068            });
1069            if !is_hidden
1070                && entry.path() != shard_dir
1071                && !entry.path().starts_with(&shard_dir)
1072                && !index.files.contains_key(&name)
1073                && !tracked_names.contains(&name)
1074                && entry.file_type().is_file()
1075                && !matches_ignore(rel_path, &ignore_patterns)
1076            {
1077                untracked.push(name);
1078            }
1079        }
1080    }
1081    if !json && !untracked.is_empty() {
1082        info!("\nUntracked files:");
1083        for name in &untracked {
1084            info!("  {}", name);
1085        }
1086    }
1087
1088    if json {
1089        info!(
1090            "{}",
1091            serde_json::to_string(&serde_json::json!({
1092                "commit": commit_id,
1093                "staged": staged,
1094                "deleted": deleted,
1095                "untracked": untracked,
1096            }))?
1097        );
1098    }
1099
1100    Ok(())
1101}
1102
1103fn load_config(shard_dir: &Path) -> Result<std::collections::BTreeMap<String, String>> {
1104    let config_path = shard_dir.join("config.json");
1105    if config_path.exists() {
1106        let data = fs::read(&config_path)?;
1107        Ok(metadata::deserialize(&data)?)
1108    } else {
1109        Ok(std::collections::BTreeMap::new())
1110    }
1111}
1112
1113fn save_config(
1114    shard_dir: &Path,
1115    config: &std::collections::BTreeMap<String, String>,
1116) -> Result<()> {
1117    let fmt = MetadataFormat::from_config(config);
1118    let data = metadata::serialize(config, &fmt);
1119    fs::write(shard_dir.join("config.json"), data)?;
1120    Ok(())
1121}
1122
1123fn global_keys_dir() -> Option<PathBuf> {
1124    std::env::var("HOME")
1125        .ok()
1126        .map(|h| Path::new(&h).join(".shard").join("keys"))
1127}
1128
1129fn load_keypair(shard_dir: &Path) -> Result<KeyPair> {
1130    let local = shard_dir.join("keys");
1131    if local.join("secret.key").exists() {
1132        return KeyPair::load(&local);
1133    }
1134    if let Some(global) = global_keys_dir() {
1135        if global.join("secret.key").exists() {
1136            return KeyPair::load(&global);
1137        }
1138    }
1139    anyhow::bail!("no keypair found in {} or ~/.shard/keys/", local.display())
1140}
1141
1142fn load_public_key(shard_dir: &Path) -> Result<Vec<u8>> {
1143    let local = shard_dir.join("keys/public.key");
1144    if local.exists() {
1145        return Ok(fs::read(&local)?);
1146    }
1147    if let Some(global) = global_keys_dir() {
1148        let gp = global.join("public.key");
1149        if gp.exists() {
1150            return Ok(fs::read(&gp)?);
1151        }
1152    }
1153    anyhow::bail!(
1154        "no public key found in {} or ~/.shard/keys/",
1155        local.display()
1156    )
1157}
1158
1159fn maybe_load_cipher(shard_dir: &Path) -> Result<Option<encryption::RepoCipher>> {
1160    let config = load_config(shard_dir)?;
1161    if config.get("private").map(|s| s.as_str()) == Some("true") {
1162        let key = encryption::load_repo_key(&shard_dir.join("keys"))?;
1163        Ok(Some(encryption::RepoCipher::from_key(&key)))
1164    } else {
1165        Ok(None)
1166    }
1167}
1168
1169pub fn config_get(path: &Path, key: Option<&str>) -> Result<()> {
1170    let shard_dir = path.join(".shard");
1171    if !shard_dir.exists() {
1172        anyhow::bail!("not a shard repository (run `shard init` first)");
1173    }
1174    let config = load_config(&shard_dir)?;
1175    if let Some(key) = key {
1176        match config.get(key) {
1177            Some(value) => info!("{} = {}", key, value),
1178            None => anyhow::bail!("config key not found: {}", key),
1179        }
1180    } else {
1181        for (k, v) in &config {
1182            info!("{} = {}", k, v);
1183        }
1184    }
1185    Ok(())
1186}
1187
1188pub fn config_set(path: &Path, key: &str, value: &str) -> Result<()> {
1189    let shard_dir = path.join(".shard");
1190    if !shard_dir.exists() {
1191        anyhow::bail!("not a shard repository (run `shard init` first)");
1192    }
1193    let mut config = load_config(&shard_dir)?;
1194    config.insert(key.to_string(), value.to_string());
1195    save_config(&shard_dir, &config)?;
1196    info!("{} = {}", key, value);
1197    Ok(())
1198}
1199
1200fn load_tags(shard_dir: &Path) -> Result<std::collections::BTreeMap<String, String>> {
1201    let tags_path = shard_dir.join("tags.json");
1202    if tags_path.exists() {
1203        let data = fs::read(&tags_path)?;
1204        Ok(serde_json::from_slice(&data)?)
1205    } else {
1206        Ok(std::collections::BTreeMap::new())
1207    }
1208}
1209
1210fn save_tags(shard_dir: &Path, tags: &std::collections::BTreeMap<String, String>) -> Result<()> {
1211    let data = serde_json::to_string_pretty(tags)?;
1212    fs::write(shard_dir.join("tags.json"), data)?;
1213    Ok(())
1214}
1215
1216pub fn tag_add(path: &Path, name: &str, commit_id: &str) -> Result<()> {
1217    let shard_dir = path.join(".shard");
1218    if !shard_dir.exists() {
1219        anyhow::bail!("not a shard repository (run `shard init` first)");
1220    }
1221    // Verify commit exists
1222    let store = Store::open(&shard_dir)?;
1223    load_commit(&store, commit_id)?;
1224    let mut tags = load_tags(&shard_dir)?;
1225    tags.insert(name.to_string(), commit_id.to_string());
1226    save_tags(&shard_dir, &tags)?;
1227    info!("Tagged '{}' -> {}", name, commit_id);
1228    Ok(())
1229}
1230
1231pub fn branch_create(path: &Path, name: &str, commit_id: Option<&str>) -> Result<()> {
1232    let shard_dir = path.join(".shard");
1233    if !shard_dir.exists() {
1234        anyhow::bail!("not a shard repository (run `shard init` first)");
1235    }
1236    let id = match commit_id {
1237        Some(cid) => cid.to_string(),
1238        None => {
1239            let (_, head) = branch::resolve_head(&shard_dir)?;
1240            head.ok_or_else(|| anyhow::anyhow!("No commits yet — cannot create branch"))?
1241        }
1242    };
1243    // Verify commit exists
1244    let store = Store::open(&shard_dir)?;
1245    load_commit(&store, &id)?;
1246    branch::create_branch(&shard_dir, name, &id)
1247}
1248
1249pub fn branch_delete(path: &Path, name: &str) -> Result<()> {
1250    let shard_dir = path.join(".shard");
1251    if !shard_dir.exists() {
1252        anyhow::bail!("not a shard repository (run `shard init` first)");
1253    }
1254    branch::delete_branch(&shard_dir, name)
1255}
1256
1257pub fn branch_list(path: &Path) -> Result<()> {
1258    let shard_dir = path.join(".shard");
1259    if !shard_dir.exists() {
1260        anyhow::bail!("not a shard repository (run `shard init` first)");
1261    }
1262    let (current, branches) = branch::list_branches(&shard_dir)?;
1263    if branches.is_empty() {
1264        info!("No branches.");
1265        return Ok(());
1266    }
1267    for (name, commit_id) in &branches {
1268        let prefix = if current.as_deref() == Some(name) {
1269            "* "
1270        } else {
1271            "  "
1272        };
1273        info!(
1274            "{}{} ({})",
1275            prefix,
1276            name,
1277            &commit_id[..8.min(commit_id.len())]
1278        );
1279    }
1280    Ok(())
1281}
1282
1283pub fn merge(path: &Path, branch: &str, message: &str, author: &str, json: bool) -> Result<()> {
1284    let shard_dir = path.join(".shard");
1285    if !shard_dir.exists() {
1286        anyhow::bail!("not a shard repository (run `shard init` first)");
1287    }
1288
1289    let store = Store::open(&shard_dir)?;
1290
1291    let config = load_config(&shard_dir)?;
1292    let fmt = MetadataFormat::from_config(&config);
1293
1294    // Resolve current HEAD
1295    let (current_branch, current_id) = branch::resolve_head(&shard_dir)?;
1296    let current_id =
1297        current_id.ok_or_else(|| anyhow::anyhow!("No commits yet — nothing to merge into"))?;
1298
1299    // Resolve source branch
1300    let source_id = branch::resolve_rev(&shard_dir, branch)?;
1301    if source_id == current_id {
1302        anyhow::bail!("Already up to date — source is the same commit as HEAD");
1303    }
1304
1305    // Load both commits
1306    let current_commit = load_commit(&store, &current_id)?;
1307    let source_commit = load_commit(&store, &source_id)?;
1308
1309    // Load manifests from both sides
1310    let mut merged_manifests: std::collections::HashMap<String, (String, Vec<String>)> =
1311        std::collections::HashMap::new();
1312
1313    for manifest_id in &current_commit.manifests {
1314        let data = store.get_chunk(manifest_id)?;
1315        let manifest: FileManifest = metadata::deserialize(&data)?;
1316        merged_manifests.insert(manifest.name.clone(), (manifest.name, manifest.chunks));
1317    }
1318
1319    for manifest_id in &source_commit.manifests {
1320        let data = store.get_chunk(manifest_id)?;
1321        let manifest: FileManifest = metadata::deserialize(&data)?;
1322        merged_manifests.insert(manifest.name.clone(), (manifest.name, manifest.chunks));
1323    }
1324
1325    // Store merged manifests (signed)
1326    let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
1327    let keys = load_keypair(&shard_dir)?;
1328    let signing_key = keys.signing_key;
1329    let mut merged_manifest_ids = Vec::new();
1330    for (name, chunks) in merged_manifests.values() {
1331        let compression = Compression::None;
1332        let mut manifest = FileManifest {
1333            name: name.clone(),
1334            size: 0,
1335            chunks: chunks.clone(),
1336            content_type: None,
1337            compression: compression.as_str().to_string(),
1338            merkle_root: Some(FileManifest::merkle_root(chunks)),
1339            created_by: Some(author.to_string()),
1340            created_at: Some(timestamp),
1341            signature: None,
1342        };
1343
1344        let mut unsigned = manifest.clone();
1345        unsigned.signature = None;
1346        let canonical = metadata::serialize_for_signing(&unsigned);
1347        let sig = signing_key.sign(&canonical);
1348        manifest.signature = Some(hex::encode(sig.to_bytes()));
1349
1350        let encoded = metadata::serialize(&manifest, &fmt);
1351        let hash = blake3::hash(&encoded);
1352        store.put_chunk(&crate::chunker::Chunk {
1353            hash,
1354            data: encoded,
1355            offset: 0,
1356        })?;
1357        merged_manifest_ids.push(hash.to_hex().to_string());
1358    }
1359    merged_manifest_ids.sort();
1360
1361    // Create merge commit
1362    let public_key_hex = hex::encode(keys.verifying_key.to_bytes());
1363    let key_id = keychain::get_current_key_id(&shard_dir.join("keys")).ok();
1364    let parents = vec![current_id.clone(), source_id.clone()];
1365    let mut commit = Commit {
1366        commit_id: String::new(),
1367        parents,
1368        manifests: merged_manifest_ids,
1369        author: author.to_string(),
1370        message: message.to_string(),
1371        timestamp,
1372        public_key: Some(public_key_hex),
1373        signature: None,
1374        key_id,
1375    };
1376
1377    let json_unsigned = metadata::serialize_for_signing(&commit);
1378    let signature = signing_key.sign(&json_unsigned);
1379    commit.signature = Some(hex::encode(signature.to_bytes()));
1380
1381    let encoded = metadata::serialize(&commit, &fmt);
1382    let hash = blake3::hash(&encoded);
1383    store.put_chunk(&crate::chunker::Chunk {
1384        hash,
1385        data: encoded,
1386        offset: 0,
1387    })?;
1388
1389    let merge_commit_id = hash.to_hex().to_string();
1390
1391    // Cycle detection on merge
1392    if has_dag_cycle(&store, &commit.parents, &merge_commit_id)? {
1393        anyhow::bail!(
1394            "Cycle detected in merge: commit {} is already an ancestor of one or more parents",
1395            merge_commit_id
1396        );
1397    }
1398
1399    // Update HEAD and branch ref
1400    if let Some(ref branch_name) = current_branch {
1401        branch::update_branch_ref(&shard_dir, branch_name, &merge_commit_id)?;
1402        branch::set_head_branch(&shard_dir, branch_name)?;
1403    } else {
1404        branch::set_head_commit(&shard_dir, &merge_commit_id)?;
1405    }
1406
1407    if json {
1408        info!(
1409            "{}",
1410            serde_json::to_string(&serde_json::json!({
1411                "commit_id": merge_commit_id,
1412                "message": message,
1413            }))?
1414        );
1415    } else {
1416        info!("Merge commit {} ({})", merge_commit_id, message);
1417    }
1418    Ok(())
1419}
1420
1421pub fn tag_list(path: &Path) -> Result<()> {
1422    let shard_dir = path.join(".shard");
1423    if !shard_dir.exists() {
1424        anyhow::bail!("not a shard repository (run `shard init` first)");
1425    }
1426    let tags = load_tags(&shard_dir)?;
1427    if tags.is_empty() {
1428        info!("No tags.");
1429    } else {
1430        for (name, commit_id) in &tags {
1431            info!("{} -> {}", name, commit_id);
1432        }
1433    }
1434    Ok(())
1435}
1436
1437fn collect_reachable(
1438    store: &Store,
1439    commit_id: &str,
1440    seen_commits: &mut std::collections::HashSet<String>,
1441    reachable: &mut std::collections::HashSet<String>,
1442) -> Result<()> {
1443    if !seen_commits.insert(commit_id.to_string()) {
1444        return Ok(());
1445    }
1446
1447    reachable.insert(commit_id.to_string());
1448
1449    let commit = match load_commit(store, commit_id) {
1450        Ok(c) => c,
1451        Err(_) => return Ok(()),
1452    };
1453
1454    for manifest_id in &commit.manifests {
1455        reachable.insert(manifest_id.clone());
1456
1457        if let Ok(data) = store.get_chunk(manifest_id) {
1458            if let Ok(manifest) = metadata::deserialize::<FileManifest>(&data) {
1459                for chunk_id in &manifest.chunks {
1460                    reachable.insert(chunk_id.clone());
1461                }
1462            }
1463        }
1464    }
1465
1466    for parent_id in &commit.parents {
1467        collect_reachable(store, parent_id, seen_commits, reachable)?;
1468    }
1469
1470    Ok(())
1471}
1472
1473pub fn prune(path: &Path, json: bool) -> Result<()> {
1474    let shard_dir = path.join(".shard");
1475    if !shard_dir.exists() {
1476        anyhow::bail!("not a shard repository (run `shard init` first)");
1477    }
1478
1479    let config = load_config(&shard_dir)?;
1480    let fmt = MetadataFormat::from_config(&config);
1481
1482    let store = Store::open(&shard_dir)?;
1483    let mut reachable: std::collections::HashSet<String> = std::collections::HashSet::new();
1484
1485    // 1. Walk from HEAD commit (and all branch tips)
1486    let (_, head_commit) = branch::resolve_head(&shard_dir)?;
1487    if let Some(ref head) = head_commit {
1488        collect_reachable(
1489            &store,
1490            head,
1491            &mut std::collections::HashSet::new(),
1492            &mut reachable,
1493        )?;
1494    }
1495
1496    // Also walk from all branch refs (in case HEAD is detached from any branch)
1497    if let Ok(branches) = branch::list_branches(&shard_dir) {
1498        for (_, commit_id) in branches.1 {
1499            collect_reachable(
1500                &store,
1501                &commit_id,
1502                &mut std::collections::HashSet::new(),
1503                &mut reachable,
1504            )?;
1505        }
1506    }
1507
1508    // 2. Walk from tags
1509    let tags = load_tags(&shard_dir)?;
1510    for commit_id in tags.values() {
1511        collect_reachable(
1512            &store,
1513            commit_id,
1514            &mut std::collections::HashSet::new(),
1515            &mut reachable,
1516        )?;
1517    }
1518
1519    // 3. Walk from index (staged files)
1520    let index = Index::load(&shard_dir.join("index"), &fmt)?;
1521    for manifest in index.files.values() {
1522        let json = metadata::serialize(manifest, &fmt);
1523        let hash = blake3::hash(&json);
1524        let hash_hex = hash.to_hex().to_string();
1525        reachable.insert(hash_hex);
1526        for chunk_hash in &manifest.chunks {
1527            reachable.insert(chunk_hash.clone());
1528        }
1529    }
1530
1531    // 4. Scan objects and remove unreachable
1532    let all_chunks = store.iter_chunks()?;
1533    let mut pruned = 0u64;
1534    let mut kept = 0u64;
1535    for (hash_hex, full_path) in &all_chunks {
1536        if !reachable.contains(hash_hex) {
1537            store.delete_chunk(hash_hex, Some(full_path))?;
1538            pruned += 1;
1539        } else {
1540            kept += 1;
1541        }
1542    }
1543
1544    if json {
1545        info!(
1546            "{}",
1547            serde_json::to_string(&serde_json::json!({
1548                "pruned": pruned,
1549                "remaining": kept,
1550            }))?
1551        );
1552    } else {
1553        info!("Pruned {} objects. {} objects remain.", pruned, kept);
1554    }
1555    Ok(())
1556}
1557
1558pub fn peer_add(path: &Path, multiaddr: &str, json: bool) -> Result<()> {
1559    let shard_dir = path.join(".shard");
1560    if !shard_dir.exists() {
1561        anyhow::bail!("not a shard repository (run `shard init` first)");
1562    }
1563
1564    // Validate multiaddr format
1565    if multiaddr.is_empty() || multiaddr.parse::<shard_net::libp2p::Multiaddr>().is_err() {
1566        anyhow::bail!("invalid multiaddr '{}' (must be a valid libp2p multiaddr, e.g. /ip4/1.2.3.4/tcp/5678/p2p/...)", multiaddr);
1567    }
1568
1569    let peers_path = shard_dir.join("peers.json");
1570    let mut peers: Vec<String> = if peers_path.exists() {
1571        let data = fs::read(&peers_path)?;
1572        serde_json::from_slice(&data)?
1573    } else {
1574        Vec::new()
1575    };
1576
1577    let added = if !peers.contains(&multiaddr.to_string()) {
1578        peers.push(multiaddr.to_string());
1579        let data = serde_json::to_vec(&peers)?;
1580        fs::write(peers_path, data)?;
1581        true
1582    } else {
1583        false
1584    };
1585
1586    if json {
1587        info!(
1588            "{}",
1589            serde_json::to_string(&serde_json::json!({
1590                "multiaddr": multiaddr,
1591                "added": added,
1592            }))?
1593        );
1594    } else if added {
1595        info!("Added peer: {}", multiaddr);
1596    } else {
1597        info!("Peer already exists: {}", multiaddr);
1598    }
1599
1600    Ok(())
1601}
1602
1603fn load_peers(shard_dir: &Path) -> Result<Vec<String>> {
1604    let peers_path = shard_dir.join("peers.json");
1605    if peers_path.exists() {
1606        let data = fs::read(peers_path)?;
1607        Ok(serde_json::from_slice(&data)?)
1608    } else {
1609        Ok(Vec::new())
1610    }
1611}
1612
1613fn authorized_keys_path(shard_dir: &Path) -> std::path::PathBuf {
1614    shard_dir.join("authorized_keys")
1615}
1616
1617fn load_authorized_keys(shard_dir: &Path) -> Result<Vec<ed25519_dalek::VerifyingKey>> {
1618    let path = authorized_keys_path(shard_dir);
1619    if !path.exists() {
1620        return Ok(Vec::new());
1621    }
1622    let content = fs::read_to_string(&path)?;
1623    let mut keys = Vec::new();
1624    for line in content.lines() {
1625        let line = line.trim();
1626        if line.is_empty() || line.starts_with('#') {
1627            continue;
1628        }
1629        let bytes = hex::decode(line)?;
1630        let arr: [u8; 32] = bytes
1631            .as_slice()
1632            .try_into()
1633            .map_err(|_| anyhow::anyhow!("Invalid public key length in authorized_keys"))?;
1634        keys.push(ed25519_dalek::VerifyingKey::from_bytes(&arr)?);
1635    }
1636    Ok(keys)
1637}
1638
1639pub fn add_authorized_key(shard_dir: &Path, public_key_hex: &str) -> Result<()> {
1640    // Validate the key
1641    let bytes = hex::decode(public_key_hex)?;
1642    let arr: [u8; 32] = bytes
1643        .as_slice()
1644        .try_into()
1645        .map_err(|_| anyhow::anyhow!("Public key must be 32 bytes (64 hex chars)"))?;
1646    let _pk = ed25519_dalek::VerifyingKey::from_bytes(&arr)?;
1647
1648    let path = authorized_keys_path(shard_dir);
1649    let mut content = if path.exists() {
1650        fs::read_to_string(&path)?
1651    } else {
1652        String::new()
1653    };
1654    // Check if key already exists
1655    if content.lines().any(|l| l.trim() == public_key_hex) {
1656        info!("Key already authorized");
1657        return Ok(());
1658    }
1659    content.push_str(public_key_hex);
1660    content.push('\n');
1661    fs::write(&path, content)?;
1662    info!("Authorized key added");
1663    Ok(())
1664}
1665
1666pub fn backup(path: &Path, output: &Path, json: bool) -> Result<()> {
1667    let shard_dir = path.join(".shard");
1668    if !shard_dir.exists() {
1669        anyhow::bail!("not a shard repository (run `shard init` first)");
1670    }
1671    let file = fs::File::create(output)?;
1672    let encoder = flate2::write::GzEncoder::new(file, flate2::Compression::default());
1673    let mut archive = tar::Builder::new(encoder);
1674    archive.append_dir_all(".shard", &shard_dir)?;
1675    archive.finish()?;
1676    if json {
1677        info!(
1678            "{}",
1679            serde_json::to_string(&serde_json::json!({
1680                "path": output.to_string_lossy(),
1681            }))?
1682        );
1683    } else {
1684        info!("Backup created: {}", output.display());
1685    }
1686    Ok(())
1687}
1688
1689pub fn export(path: &Path, commit_id: &str, output_dir: &Path, json: bool) -> Result<()> {
1690    let shard_dir = path.join(".shard");
1691    if !shard_dir.exists() {
1692        anyhow::bail!("not a shard repository (run `shard init` first)");
1693    }
1694    let store = Store::open(&shard_dir)?;
1695    let cipher = maybe_load_cipher(&shard_dir)?;
1696    let commit = load_commit(&store, commit_id)?;
1697    let mut files = Vec::new();
1698    for manifest_id in &commit.manifests {
1699        let data = store.get_chunk(manifest_id)?;
1700        let manifest: FileManifest = metadata::deserialize(&data)?;
1701        let compression = manifest.compression.parse::<Compression>()?;
1702        if !json {
1703            info!("Exporting file: {}", manifest.name);
1704        }
1705        let mut file_data = Vec::new();
1706        for chunk_id in &manifest.chunks {
1707            let chunk_data = store.get_chunk(chunk_id)?;
1708            let decrypted = match &cipher {
1709                Some(c) => c.decrypt(&chunk_data)?,
1710                None => chunk_data,
1711            };
1712            let decompressed = compression.decompress(&decrypted)?;
1713            file_data.extend_from_slice(&decompressed);
1714        }
1715        let out_path = output_dir.join(&manifest.name);
1716        if let Some(parent) = out_path.parent() {
1717            fs::create_dir_all(parent)?;
1718        }
1719        fs::write(&out_path, file_data)?;
1720        if !json {
1721            info!("  -> {}", out_path.display());
1722        }
1723        files.push(manifest.name);
1724    }
1725    if json {
1726        info!(
1727            "{}",
1728            serde_json::to_string(&serde_json::json!({
1729                "commit_id": commit_id,
1730                "files": files,
1731                "output_dir": output_dir.to_string_lossy(),
1732            }))?
1733        );
1734    } else {
1735        info!("Export complete.");
1736    }
1737    Ok(())
1738}
1739
1740pub fn import(
1741    path: &Path,
1742    source_dir: &Path,
1743    message: &str,
1744    author: &str,
1745    json: bool,
1746) -> Result<()> {
1747    let shard_dir = path.join(".shard");
1748    if !shard_dir.exists() {
1749        anyhow::bail!("not a shard repository (run `shard init` first)");
1750    }
1751    // Walk files in source_dir
1752    let config = load_config(&shard_dir)?;
1753    let compression: Compression = config
1754        .get("compression")
1755        .map(|s| s.as_str())
1756        .unwrap_or("zstd")
1757        .parse()?;
1758    let chunker_mode = chunker::ChunkerMode::from_config(&config);
1759    let fmt = MetadataFormat::from_config(&config);
1760    let store = Store::open(&shard_dir)?;
1761    let cipher = maybe_load_cipher(&shard_dir)?;
1762    let mut index = Index::load(&shard_dir.join("index"), &fmt)?;
1763    if !source_dir.is_dir() {
1764        anyhow::bail!("Source must be a directory");
1765    }
1766    for entry in walkdir::WalkDir::new(source_dir)
1767        .into_iter()
1768        .filter_entry(|e| {
1769            e.file_name()
1770                .to_str()
1771                .map(|s| !s.starts_with('.'))
1772                .unwrap_or(false)
1773        })
1774    {
1775        let entry = entry?;
1776        if entry.file_type().is_file() {
1777            add_file(
1778                path,
1779                entry.path(),
1780                &store,
1781                &mut index,
1782                &compression,
1783                &chunker_mode,
1784                cipher.as_ref(),
1785                json,
1786            )?;
1787        }
1788    }
1789    index.save(&shard_dir.join("index"), &fmt)?;
1790    // Auto-commit
1791    if !index.files.is_empty() {
1792        commit(path, message, author, json)?;
1793    } else if json {
1794        info!(
1795            "{}",
1796            serde_json::to_string(&serde_json::json!({"status": "no files found"}))?
1797        );
1798    } else {
1799        info!("No files found to import.");
1800    }
1801    Ok(())
1802}
1803
1804pub fn restore(path: &Path, backup_file: &Path, json: bool) -> Result<()> {
1805    let shard_dir = path.join(".shard");
1806    if shard_dir.exists() {
1807        anyhow::bail!(
1808            "Repository already exists — remove .shard first or use a different directory"
1809        );
1810    }
1811    let file = fs::File::open(backup_file)?;
1812    let decoder = flate2::read::GzDecoder::new(file);
1813    let mut archive = tar::Archive::new(decoder);
1814    archive.unpack(path)?;
1815    // Verify the result
1816    if !path.join(".shard").exists() {
1817        anyhow::bail!("Backup does not contain a valid .shard directory");
1818    }
1819    if json {
1820        info!(
1821            "{}",
1822            serde_json::to_string(&serde_json::json!({
1823                "backup": backup_file.to_string_lossy(),
1824            }))?
1825        );
1826    } else {
1827        info!("Restored from {}", backup_file.display());
1828    }
1829    Ok(())
1830}
1831
1832struct RepoProvider {
1833    store: Store,
1834    shard_dir: std::path::PathBuf,
1835}
1836
1837impl shard_net::p2p::ShardContentProvider for RepoProvider {
1838    fn get_manifest(&self, id: &str) -> Option<Vec<u8>> {
1839        self.store.get_chunk(id).ok()
1840    }
1841    fn get_chunk(&self, id: &str) -> Option<Vec<u8>> {
1842        self.store.get_chunk(id).ok()
1843    }
1844    fn put_chunk(&mut self, id: &str, data: &[u8]) -> bool {
1845        let hash = blake3::hash(data);
1846        let hex = hash.to_hex().to_string();
1847        if hex != id {
1848            return false;
1849        }
1850        self.store
1851            .put_chunk(&crate::chunker::Chunk {
1852                hash,
1853                data: data.to_vec(),
1854                offset: 0,
1855            })
1856            .is_ok()
1857    }
1858    fn verify_auth(&self, public_key: &[u8], nonce: &[u8], signature: &[u8]) -> bool {
1859        use ed25519_dalek::Verifier;
1860        let pk_bytes: [u8; 32] = match public_key.try_into() {
1861            Ok(b) => b,
1862            Err(_) => return false,
1863        };
1864        let pk = match ed25519_dalek::VerifyingKey::from_bytes(&pk_bytes) {
1865            Ok(k) => k,
1866            Err(_) => return false,
1867        };
1868        let sig_bytes: [u8; 64] = match signature.try_into() {
1869            Ok(b) => b,
1870            Err(_) => return false,
1871        };
1872        let sig = ed25519_dalek::Signature::from_bytes(&sig_bytes);
1873        if pk.verify(nonce, &sig).is_err() {
1874            return false;
1875        }
1876        // Check authorized_keys if the file exists
1877        if let Ok(keys) = load_authorized_keys(&self.shard_dir) {
1878            if !keys.is_empty() {
1879                return keys.contains(&pk);
1880            }
1881        }
1882        true
1883    }
1884    fn repo_public_key(&self) -> Option<Vec<u8>> {
1885        let keys = load_keypair(&self.shard_dir).ok()?;
1886        Some(keys.verifying_key.to_bytes().to_vec())
1887    }
1888}
1889
1890/// Compute file count and total size for a commit's manifests.
1891fn commit_stats(shard_dir: &Path, commit_id: &str) -> Result<(u64, u64)> {
1892    let store = Store::open(shard_dir)?;
1893    let commit = load_commit(&store, commit_id)?;
1894    let mut file_count = 0u64;
1895    let mut total_size = 0u64;
1896    for mid in &commit.manifests {
1897        if let Ok(data) = store.get_chunk(mid) {
1898            if let Ok(m) = metadata::deserialize::<FileManifest>(&data) {
1899                file_count += 1;
1900                total_size += m.size;
1901            }
1902        }
1903    }
1904    Ok((file_count, total_size))
1905}
1906
1907pub async fn share(path: &Path, json: bool) -> Result<()> {
1908    let shard_dir = path.join(".shard");
1909    if !shard_dir.exists() {
1910        anyhow::bail!("not a shard repository (run `shard init` first)");
1911    }
1912
1913    let mut node = shard_net::p2p::Node::new().await?;
1914
1915    // Subscribe to global announcement topic
1916    let ann_topic = shard_net::libp2p::gossipsub::IdentTopic::new("shard:ann");
1917    node.subscribe(&ann_topic)?;
1918
1919    // Bootstrap from peers
1920    let peers = load_peers(&shard_dir)?;
1921    for peer in peers {
1922        if let Ok(addr) = peer.parse::<shard_net::libp2p::Multiaddr>() {
1923            let _ = node.swarm.dial(addr);
1924        }
1925    }
1926
1927    // Trigger Kademlia DHT bootstrap
1928    node.swarm.behaviour_mut().kademlia.bootstrap().ok();
1929
1930    node.listen("/ip4/0.0.0.0/tcp/0").await?;
1931    let listen_addrs: Vec<String> = node.swarm.listeners().map(|a| a.to_string()).collect();
1932    let peer_multiaddr = listen_addrs.first().cloned().unwrap_or_default();
1933
1934    let config = load_config(&shard_dir)?;
1935    let repo_name = config.get("repo_name").cloned().unwrap_or_default();
1936
1937    let store = Store::open(&shard_dir)?;
1938    let provider = RepoProvider {
1939        store,
1940        shard_dir: shard_dir.clone(),
1941    };
1942
1943    // Publish announcement for HEAD commit
1944    if let Some(head) = branch::resolve_head(&shard_dir)?.1 {
1945        let (file_count, total_size) = commit_stats(&shard_dir, &head)?;
1946        let ann = shard_net::protocol::Announcement {
1947            commit_id: head,
1948            file_count,
1949            total_size,
1950            repo_name: repo_name.clone(),
1951            peer_multiaddr: format!("{}/p2p/{}", peer_multiaddr, node.local_peer_id()),
1952        };
1953        let payload = serde_json::to_vec(&ann)?;
1954        let _ = node.publish(&ann_topic, payload);
1955    }
1956
1957    if json {
1958        info!(
1959            "{}",
1960            serde_json::to_string(&serde_json::json!({
1961                "status": "sharing",
1962                "peer_id": node.local_peer_id().to_string(),
1963            }))?
1964        );
1965    } else {
1966        info!("Sharing repository...");
1967    }
1968    node.run(provider).await;
1969
1970    Ok(())
1971}
1972
1973/// Start a circuit relay v2 server for NAT traversal.
1974/// Listens on the given address and forwards traffic between peers.
1975pub async fn relay(listen_addr: &str, json: bool) -> Result<()> {
1976    let mut node = shard_net::p2p::Node::new().await?;
1977    node.listen(listen_addr).await?;
1978    if json {
1979        info!(
1980            "{}",
1981            serde_json::to_string(&serde_json::json!({
1982                "status": "relay active",
1983                "listen": listen_addr,
1984                "peer_id": node.local_peer_id().to_string(),
1985            }))?
1986        );
1987    } else {
1988        info!("Relay server active on {}", listen_addr);
1989        info!("Peer ID: {}", node.local_peer_id());
1990        info!("Ready to accept circuit relay v2 reservations");
1991    }
1992    node.run(EmptyProvider).await;
1993    Ok(())
1994}
1995
1996/// Minimal provider for relay-only mode (no repo content needed).
1997struct EmptyProvider;
1998impl shard_net::p2p::ShardContentProvider for EmptyProvider {
1999    fn get_manifest(&self, _id: &str) -> Option<Vec<u8>> {
2000        None
2001    }
2002    fn get_chunk(&self, _id: &str) -> Option<Vec<u8>> {
2003        None
2004    }
2005    fn put_chunk(&mut self, _id: &str, _data: &[u8]) -> bool {
2006        false
2007    }
2008    fn verify_auth(&self, _public_key: &[u8], _nonce: &[u8], _signature: &[u8]) -> bool {
2009        false
2010    }
2011    fn repo_public_key(&self) -> Option<Vec<u8>> {
2012        None
2013    }
2014}
2015
2016pub async fn sync(path: &Path, _json: bool) -> Result<()> {
2017    let shard_dir = path.join(".shard");
2018    if !shard_dir.exists() {
2019        anyhow::bail!("not a shard repository (run `shard init` first)");
2020    }
2021
2022    let config = load_config(&shard_dir)?;
2023    let repo_id = config.get("repo_id").cloned().unwrap_or_default();
2024    let repo_name = config.get("repo_name").cloned().unwrap_or_default();
2025    let ann_topic = shard_net::libp2p::gossipsub::IdentTopic::new("shard:ann");
2026    let repo_topic =
2027        shard_net::libp2p::gossipsub::IdentTopic::new(format!("/shard/repo/{}", repo_id));
2028
2029    let mut node = shard_net::p2p::Node::new().await?;
2030    node.subscribe(&ann_topic)?;
2031    node.subscribe(&repo_topic)?;
2032    node.listen("/ip4/0.0.0.0/tcp/0").await?;
2033    let listen_addrs: Vec<String> = node.swarm.listeners().map(|a| a.to_string()).collect();
2034    let peer_multiaddr = listen_addrs.first().cloned().unwrap_or_default();
2035
2036    // Bootstrap from configured peers
2037    let peers = load_peers(&shard_dir)?;
2038    for peer in peers {
2039        if let Ok(addr) = peer.parse::<shard_net::libp2p::Multiaddr>() {
2040            let _ = node.swarm.dial(addr);
2041        }
2042    }
2043
2044    // Trigger Kademlia DHT bootstrap
2045    node.swarm.behaviour_mut().kademlia.bootstrap().ok();
2046
2047    // Helper to publish announcement to both global and repo-specific topics
2048    let publish_ann = |node: &mut shard_net::p2p::Node, ann: &shard_net::protocol::Announcement| {
2049        if let Ok(payload) = serde_json::to_vec(ann) {
2050            let _ = node.publish(&ann_topic, payload.clone());
2051            let _ = node.publish(&repo_topic, payload);
2052        }
2053    };
2054
2055    let head_commit = branch::resolve_head(&shard_dir)?.1;
2056
2057    // Initial announce (may fail with InsufficientPeers if no peers yet)
2058    if let Some(ref head) = head_commit {
2059        let (file_count, total_size) = commit_stats(&shard_dir, head)?;
2060        let ann = shard_net::protocol::Announcement {
2061            commit_id: head.clone(),
2062            file_count,
2063            total_size,
2064            repo_name: repo_name.clone(),
2065            peer_multiaddr: format!("{}/p2p/{}", peer_multiaddr, node.local_peer_id()),
2066        };
2067        publish_ann(&mut node, &ann);
2068        if !_json {
2069            info!("Announced commit {} on sync topic", head)
2070        }
2071    } else if !_json {
2072        info!("No commits to announce");
2073    }
2074
2075    if !_json {
2076        info!("Syncing on topic with peer id: {}", node.local_peer_id());
2077    }
2078    let _ = std::io::stdout().flush();
2079
2080    let store = Store::open(&shard_dir)?;
2081    let mut provider = RepoProvider {
2082        store,
2083        shard_dir: shard_dir.clone(),
2084    };
2085
2086    let mut interval = tokio::time::interval(Duration::from_secs(5));
2087    let mut address_book: HashMap<shard_net::libp2p::PeerId, Vec<shard_net::libp2p::Multiaddr>> =
2088        HashMap::new();
2089    let mut announce_counts: HashMap<(shard_net::libp2p::PeerId, String), u32> = HashMap::new();
2090    let mut request_counts: HashMap<shard_net::libp2p::PeerId, u32> = HashMap::new();
2091    let path_buf = path.to_path_buf();
2092
2093    loop {
2094        tokio::select! {
2095            _ = tokio::signal::ctrl_c() => {
2096                info!("\nSync shutting down...");
2097                break Ok(());
2098            }
2099            _ = interval.tick() => {
2100                // Periodic rate-limit counter reset (rolling 60s window)
2101                request_counts.clear();
2102                announce_counts.clear();
2103                if let Some(ref head) = branch::resolve_head(&shard_dir)?.1 {
2104                    let (fc, ts) = commit_stats(&shard_dir, head).unwrap_or_default();
2105                    let ann = shard_net::protocol::Announcement {
2106                        commit_id: head.clone(),
2107                        file_count: fc,
2108                        total_size: ts,
2109                        repo_name: repo_name.clone(),
2110                        peer_multiaddr: format!("{}/p2p/{}", peer_multiaddr, node.local_peer_id()),
2111                    };
2112                    publish_ann(&mut node, &ann);
2113                    info!("Re-announced commit {} on sync topic", head);
2114                }
2115            }
2116            event = node.swarm.select_next_some() => {
2117                match event {
2118                    shard_net::libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
2119                        info!("Listening on {address:?}");
2120                        let _ = std::io::stdout().flush();
2121                    }
2122                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2123                        shard_net::p2p::ShardBehaviourEvent::Mdns(
2124                            shard_net::libp2p::mdns::Event::Discovered(list),
2125                        ),
2126                    ) => {
2127                        for (peer_id, multiaddr) in list {
2128                            info!("mDNS discovered: {peer_id} {multiaddr}");
2129                            address_book.entry(peer_id).or_default().push(multiaddr.clone());
2130                            node.swarm
2131                                .behaviour_mut()
2132                                .gossipsub
2133                                .add_explicit_peer(&peer_id);
2134                            node.swarm
2135                                .behaviour_mut()
2136                                .kademlia
2137                                .add_address(&peer_id, multiaddr);
2138                        }
2139                    }
2140                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2141                        shard_net::p2p::ShardBehaviourEvent::Mdns(shard_net::libp2p::mdns::Event::Expired(
2142                            list,
2143                        )),
2144                    ) => {
2145                        for (peer_id, _multiaddr) in list {
2146                            info!("mDNS expired: {peer_id}");
2147                            node.swarm
2148                                .behaviour_mut()
2149                                .gossipsub
2150                                .remove_explicit_peer(&peer_id);
2151                        }
2152                    }
2153                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2154                        shard_net::p2p::ShardBehaviourEvent::Gossipsub(
2155                            shard_net::libp2p::gossipsub::Event::Message {
2156                                propagation_source,
2157                                message,
2158                                ..
2159                            },
2160                        ),
2161                    ) => {
2162                        // Parse structured Announcement JSON
2163                        if let Ok(ann) = serde_json::from_slice::<shard_net::protocol::Announcement>(&message.data) {
2164                            let peer = propagation_source;
2165                            let commit_id_owned = ann.commit_id;
2166                            info!(
2167                                "Peer {} announced commit: {} (repo: {}, {} files, {} bytes)",
2168                                peer, commit_id_owned, ann.repo_name, ann.file_count, ann.total_size,
2169                            );
2170                            // Store announcer's multiaddr if we don't have it yet
2171                            if !ann.peer_multiaddr.is_empty() && !address_book.contains_key(&peer) {
2172                                if let Ok(addr) = ann.peer_multiaddr.parse::<shard_net::libp2p::Multiaddr>() {
2173                                    address_book.entry(peer).or_default().push(addr);
2174                                }
2175                            }
2176                            // Rate limiting: track announcement count per peer
2177                            let rate_key = (peer, commit_id_owned.clone());
2178                            if announce_counts.get(&rate_key).copied().unwrap_or(0) > 5 {
2179                                warn!("Rate limit exceeded for peer {} commit {}", peer, commit_id_owned);
2180                            } else {
2181                                *announce_counts.entry(rate_key).or_insert(0) += 1;
2182                                // Reply with our HEAD if different
2183                                let our_head = branch::resolve_head(&shard_dir)?.1.unwrap_or_default();
2184                                if our_head != commit_id_owned {
2185                                    let (fc, ts) = commit_stats(&shard_dir, &our_head).unwrap_or_default();
2186                                    let reply = shard_net::protocol::Announcement {
2187                                        commit_id: our_head,
2188                                        file_count: fc,
2189                                        total_size: ts,
2190                                        repo_name: repo_name.clone(),
2191                                        peer_multiaddr: format!("{}/p2p/{}", peer_multiaddr, node.local_peer_id()),
2192                                    };
2193                                    publish_ann(&mut node, &reply);
2194                                }
2195                                if let Some(addrs) = address_book.get(&peer) {
2196                                    if let Some(addr) = addrs.first() {
2197                                        let multiaddr_str = format!("{}/p2p/{}", addr, peer);
2198                                        let path_clone = path_buf.clone();
2199                                        tokio::spawn(async move {
2200                                            match pull(&path_clone, &multiaddr_str, &commit_id_owned, false).await {
2201                                                Ok(_) => info!("Auto-pulled commit {} from {}", commit_id_owned, peer),
2202                                                Err(e) => error!("Auto-pull failed for commit {} from {}: {}", commit_id_owned, peer, e),
2203                                            }
2204                                        });
2205                                    }
2206                                }
2207                            }
2208                        }
2209                    }
2210                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2211                        shard_net::p2p::ShardBehaviourEvent::RequestResponse(
2212                            shard_net::libp2p::request_response::Event::Message { peer, message },
2213                        ),
2214                    ) => {
2215                        if let shard_net::libp2p::request_response::Message::Request {
2216                            request, channel, ..
2217                        } = message
2218                        {
2219                            // Per-peer request rate limiting: max 50 requests in any 60s window
2220                            let req_count = request_counts.entry(peer).or_insert(0u32);
2221                            *req_count += 1;
2222                            if *req_count > 50 {
2223                                warn!("Dropping request from {}: rate limit exceeded", peer);
2224                                // Reset counter periodically (cooldown via interval tick)
2225                            } else {
2226                                info!("Received request from {}", peer);
2227                                node.serve_request(&peer, &mut provider, request, channel);
2228                            }
2229                        } else {
2230                            info!("Received Response from {}", peer);
2231                        }
2232                    }
2233                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2234                        shard_net::p2p::ShardBehaviourEvent::RequestResponse(
2235                            shard_net::libp2p::request_response::Event::OutboundFailure {
2236                                peer, error, ..
2237                            },
2238                        ),
2239                    ) => {
2240                        error!("Outbound failure to {}: {:?}", peer, error);
2241                    }
2242                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2243                        shard_net::p2p::ShardBehaviourEvent::RequestResponse(
2244                            shard_net::libp2p::request_response::Event::InboundFailure {
2245                                peer, error, ..
2246                            },
2247                        ),
2248                    ) => {
2249                        error!("Inbound failure from {}: {:?}", peer, error);
2250                    }
2251                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2252                        shard_net::p2p::ShardBehaviourEvent::Identify(
2253                            shard_net::libp2p::identify::Event::Received { peer_id, info },
2254                        ),
2255                    ) => {
2256                        info!("Identify received from {}: {:?}", peer_id, info.listen_addrs);
2257                        for addr in info.listen_addrs {
2258                            address_book.entry(peer_id).or_default().push(addr);
2259                        }
2260                        let _ = std::io::stdout().flush();
2261                    }
2262                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2263                        shard_net::p2p::ShardBehaviourEvent::Identify(event),
2264                    ) => {
2265                        info!("Identify event: {:?}", event);
2266                    }
2267                    shard_net::libp2p::swarm::SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } => {
2268                        info!("Connection established with {}", peer_id);
2269                        // Only store the address when we dialed (it's the peer's listen addr).
2270                        // For listener connections, send_back_addr is the ephemeral port — useless for dialing back.
2271                        if let shard_net::libp2p::core::ConnectedPoint::Dialer { address, .. } = &endpoint {
2272                            address_book.entry(peer_id).or_default().push(address.clone());
2273                        }
2274                        node.swarm
2275                            .behaviour_mut()
2276                            .gossipsub
2277                            .add_explicit_peer(&peer_id);
2278                        // Announce HEAD to the newly connected peer
2279                        if let Some(ref head) = branch::resolve_head(&shard_dir)?.1 {
2280                            let (fc, ts) = commit_stats(&shard_dir, head).unwrap_or_default();
2281                            let ann = shard_net::protocol::Announcement {
2282                                commit_id: head.clone(),
2283                                file_count: fc,
2284                                total_size: ts,
2285                                repo_name: repo_name.clone(),
2286                                peer_multiaddr: format!("{}/p2p/{}", peer_multiaddr, node.local_peer_id()),
2287                            };
2288                            publish_ann(&mut node, &ann);
2289                        }
2290                    }
2291                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2292                        shard_net::p2p::ShardBehaviourEvent::Relay(event),
2293                    ) => {
2294                        info!("Relay event: {:?}", event);
2295                    }
2296                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2297                        shard_net::p2p::ShardBehaviourEvent::Dcutr(event),
2298                    ) => {
2299                        info!("DCUtR event: {:?}", event);
2300                    }
2301                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2302                        shard_net::p2p::ShardBehaviourEvent::Autonat(event),
2303                    ) => {
2304                        info!("AutoNAT event: {:?}", event);
2305                    }
2306                    shard_net::libp2p::swarm::SwarmEvent::IncomingConnection {
2307                        local_addr,
2308                        send_back_addr,
2309                        ..
2310                    } => {
2311                        info!(
2312                            "Incoming connection from {} to {}",
2313                            send_back_addr, local_addr
2314                        );
2315                    }
2316                    e => {
2317                        info!("Event: {:?}", e);
2318                    }
2319                }
2320            }
2321        }
2322    }
2323}
2324
2325pub async fn pull(path: &Path, peer: &str, commit_id: &str, json: bool) -> Result<()> {
2326    let shard_dir = path.join(".shard");
2327    // pull can work on empty repo or existing one.
2328
2329    if !shard_dir.exists() {
2330        init(path, "flat", "zstd", "fixed", None, false, false)?;
2331    }
2332
2333    let store = Store::open(&shard_dir)?;
2334    let cipher = maybe_load_cipher(&shard_dir)?;
2335
2336    let mut node = shard_net::p2p::Node::new().await?;
2337
2338    // Parse peer multiaddr
2339    let multiaddr: shard_net::libp2p::Multiaddr = peer.parse()?;
2340    let peer_id = match multiaddr.iter().last() {
2341        Some(shard_net::libp2p::multiaddr::Protocol::P2p(peer_id)) => peer_id,
2342        _ => anyhow::bail!("Multiaddr must end with /p2p/<peer_id>"),
2343    };
2344
2345    // 1. Get Commit (sequential — single request)
2346    if !json {
2347        info!("Pulling commit {} from {}...", commit_id, peer);
2348    }
2349    let commit_data = node
2350        .request_manifest(&multiaddr, peer_id, commit_id.to_string())
2351        .await?;
2352    let hash = blake3::hash(&commit_data);
2353    if hash.to_hex().to_string() != commit_id {
2354        anyhow::bail!("Commit hash mismatch");
2355    }
2356    let chunk = crate::chunker::Chunk {
2357        hash,
2358        data: commit_data.clone(),
2359        offset: 0,
2360    };
2361    store.put_chunk(&chunk)?;
2362
2363    let commit: Commit = metadata::deserialize(&commit_data)?;
2364    if !json {
2365        info!("Got commit: {}", commit.message);
2366    }
2367
2368    // Fetch key rotation records for this commit's key chain
2369    let keys_dir = shard_dir.join("keys");
2370    if let Some(kid) = &commit.key_id {
2371        if keys_dir.join("records").exists() {
2372            if let Ok(chain) = keychain::collect_rotation_chain(&keys_dir, kid) {
2373                let missing_rotations: Vec<&KeyRotation> = chain
2374                    .iter()
2375                    .filter(|r| store.get_chunk(&r.rotation_id).is_err())
2376                    .collect();
2377                if !missing_rotations.is_empty() {
2378                    if !json {
2379                        info!(
2380                            "Fetching {} key rotation records from peer...",
2381                            missing_rotations.len()
2382                        );
2383                    }
2384                    let rot_requests: Vec<(String, shard_net::protocol::ShardRequest)> =
2385                        missing_rotations
2386                            .iter()
2387                            .map(|r| {
2388                                (
2389                                    r.rotation_id.clone(),
2390                                    shard_net::protocol::ShardRequest::GetChunk(
2391                                        r.rotation_id.clone(),
2392                                    ),
2393                                )
2394                            })
2395                            .collect();
2396                    if let Ok(rot_results) = node
2397                        .request_parallel(&multiaddr, peer_id, rot_requests)
2398                        .await
2399                    {
2400                        for (rot_id, rot_data) in &rot_results {
2401                            let rh = blake3::hash(rot_data);
2402                            if rh.to_hex().to_string() != *rot_id {
2403                                info!("Key rotation record hash mismatch (expected {}, got {}) — skipping", rot_id, rh.to_hex());
2404                                continue;
2405                            }
2406                            store.put_chunk(&crate::chunker::Chunk {
2407                                hash: rh,
2408                                data: rot_data.clone(),
2409                                offset: 0,
2410                            })?;
2411                        }
2412                        if !json {
2413                            info!("Key rotation records synced from peer.");
2414                        }
2415                    }
2416                }
2417            }
2418        }
2419    }
2420
2421    // Set repo_id from commit's public key so clones share the gossipsub topic
2422    if let Some(pk_hex) = &commit.public_key {
2423        let pk_bytes = hex::decode(pk_hex)?;
2424        let repo_id = blake3::hash(&pk_bytes).to_hex().to_string();
2425        let mut config = load_config(&shard_dir)?;
2426        config.insert("repo_id".to_string(), repo_id);
2427        save_config(&shard_dir, &config)?;
2428    }
2429
2430    // 2. Fetch all manifests in parallel
2431    let manifest_requests: Vec<(String, shard_net::protocol::ShardRequest)> = commit
2432        .manifests
2433        .iter()
2434        .map(|id| {
2435            (
2436                id.clone(),
2437                shard_net::protocol::ShardRequest::GetManifest(id.clone()),
2438            )
2439        })
2440        .collect();
2441    let manifest_results = node
2442        .request_parallel(&multiaddr, peer_id, manifest_requests)
2443        .await?;
2444
2445    let mut all_chunk_ids: Vec<String> = Vec::new();
2446    let mut file_manifests: Vec<FileManifest> = Vec::new();
2447    // Map chunk_id -> compression type for verification in step 3
2448    let mut chunk_compression: HashMap<String, String> = HashMap::new();
2449
2450    for (manifest_id, manifest_data) in &manifest_results {
2451        let hash = blake3::hash(manifest_data);
2452        if hash.to_hex().to_string() != *manifest_id {
2453            anyhow::bail!("Manifest hash mismatch: {}", manifest_id);
2454        }
2455        let chunk = crate::chunker::Chunk {
2456            hash,
2457            data: manifest_data.clone(),
2458            offset: 0,
2459        };
2460        store.put_chunk(&chunk)?;
2461        let manifest: FileManifest = metadata::deserialize(manifest_data)?;
2462        if !json {
2463            info!(
2464                "Fetching file: {} (compression: {})",
2465                manifest.name, manifest.compression
2466            );
2467        }
2468        for cid in &manifest.chunks {
2469            chunk_compression.insert(cid.clone(), manifest.compression.clone());
2470        }
2471        all_chunk_ids.extend(manifest.chunks.clone());
2472        file_manifests.push(manifest);
2473    }
2474
2475    // 3. Resume: recover from partial directory if previous transfer was interrupted
2476    let partial = partial::PartialTransfer::new(&shard_dir, commit_id)?;
2477    let partial_chunks = partial.list_chunks()?;
2478    let mut recovered = 0usize;
2479    for chunk_id in &partial_chunks {
2480        if let Ok(data) = partial.load_chunk(chunk_id) {
2481            let hash = blake3::hash(&data);
2482            if hash.to_hex().to_string() == *chunk_id {
2483                // Recovered chunk matches — save to store
2484                let chunk = crate::chunker::Chunk {
2485                    hash,
2486                    data: data.clone(),
2487                    offset: 0,
2488                };
2489                if store.put_chunk(&chunk).is_ok() {
2490                    recovered += 1;
2491                }
2492            } else {
2493                // Corrupted partial chunk — remove and re-fetch
2494                let _ = partial.remove_chunk(chunk_id);
2495            }
2496        }
2497    }
2498    if recovered > 0 {
2499        info!("Recovered {} chunks from partial transfer", recovered);
2500    }
2501
2502    // 4. Fetch all missing chunks (not in store and not in partial)
2503    let needed_chunks: Vec<String> = all_chunk_ids
2504        .into_iter()
2505        .filter(|id| store.get_chunk(id).is_err())
2506        .collect();
2507
2508    if !needed_chunks.is_empty() {
2509        if !json {
2510            info!("Fetching {} chunks...", needed_chunks.len());
2511        }
2512        let chunk_requests: Vec<(String, shard_net::protocol::ShardRequest)> = needed_chunks
2513            .iter()
2514            .map(|id| {
2515                (
2516                    id.clone(),
2517                    shard_net::protocol::ShardRequest::GetChunk(id.clone()),
2518                )
2519            })
2520            .collect();
2521        let chunk_results = node
2522            .request_parallel(&multiaddr, peer_id, chunk_requests)
2523            .await?;
2524        for (chunk_id, chunk_data) in &chunk_results {
2525            // Determine compression from the manifest this chunk belongs to
2526            let compression: Compression = chunk_compression
2527                .get(chunk_id)
2528                .map(|s| s.as_str())
2529                .unwrap_or("none")
2530                .parse()?;
2531            // Decrypt (if private) then decompress to verify the content hash
2532            let decrypted = match &cipher {
2533                Some(c) => c.decrypt(chunk_data)?,
2534                None => chunk_data.clone(),
2535            };
2536            let decompressed = compression.decompress(&decrypted)?;
2537            let hash = blake3::hash(&decompressed);
2538            if hash.to_hex().to_string() != *chunk_id {
2539                anyhow::bail!("Chunk hash mismatch: {}", chunk_id);
2540            }
2541            // Store the data as received (encrypted for private repos)
2542            let chunk = crate::chunker::Chunk {
2543                hash,
2544                data: chunk_data.clone(),
2545                offset: 0,
2546            };
2547            store.put_chunk(&chunk)?;
2548            // Save to partial for resume support
2549            partial.save_chunk(chunk_id, chunk_data)?;
2550        }
2551    }
2552
2553    // 5. Reconstruct all files
2554    for manifest in &file_manifests {
2555        let compression = manifest.compression.parse::<Compression>()?;
2556        let mut file_data = Vec::new();
2557        for chunk_id in &manifest.chunks {
2558            let stored = store.get_chunk(chunk_id)?;
2559            let decrypted = match &cipher {
2560                Some(c) => c.decrypt(&stored)?,
2561                None => stored,
2562            };
2563            let decompressed = compression.decompress(&decrypted)?;
2564            file_data.extend_from_slice(&decompressed);
2565        }
2566        fs::write(path.join(&manifest.name), file_data)?;
2567        if !json {
2568            info!(
2569                "Reconstructed file: {} ({} bytes)",
2570                manifest.name, manifest.size
2571            );
2572        }
2573    }
2574
2575    // 6. Clean up partial transfer tracking
2576    partial.cleanup()?;
2577
2578    if json {
2579        info!(
2580            "{}",
2581            serde_json::to_string(&serde_json::json!({
2582                "status": "pull complete",
2583                "commit_id": commit_id,
2584            }))?
2585        );
2586    } else {
2587        info!("Pull complete.");
2588    }
2589    Ok(())
2590}
2591
2592pub fn transfer_list(path: &Path, json: bool) -> Result<()> {
2593    let shard_dir = path.join(".shard");
2594    if !shard_dir.exists() {
2595        anyhow::bail!("not a shard repository (run `shard init` first)");
2596    }
2597    let transfers = partial::list_incomplete_transfers(&shard_dir)?;
2598    if json {
2599        info!("{}", serde_json::to_string(&transfers)?);
2600    } else {
2601        if transfers.is_empty() {
2602            info!("No incomplete transfers.");
2603        } else {
2604            for t in &transfers {
2605                info!("Incomplete transfer: {}", t);
2606            }
2607        }
2608    }
2609    Ok(())
2610}
2611
2612pub fn transfer_remove(path: &Path, commit_id: &str) -> Result<()> {
2613    let shard_dir = path.join(".shard");
2614    if !shard_dir.exists() {
2615        anyhow::bail!("not a shard repository (run `shard init` first)");
2616    }
2617    partial::remove_transfer(&shard_dir, commit_id)?;
2618    info!("Removed transfer tracking for {}", commit_id);
2619    Ok(())
2620}
2621
2622pub async fn push(path: &Path, peer: &str, json: bool) -> Result<()> {
2623    let shard_dir = path.join(".shard");
2624    if !shard_dir.exists() {
2625        anyhow::bail!("not a shard repository (run `shard init` first)");
2626    }
2627
2628    let (_, head_id) = branch::resolve_head(&shard_dir)?;
2629    let head_id = head_id.ok_or_else(|| anyhow::anyhow!("No commits to push"))?;
2630
2631    let store = Store::open(&shard_dir)?;
2632
2633    // Collect all reachable objects
2634    let mut objects: std::collections::BTreeMap<String, Vec<u8>> =
2635        std::collections::BTreeMap::new();
2636
2637    // Walk commits
2638    let mut seen = std::collections::HashSet::new();
2639    let mut stack = vec![head_id.clone()];
2640    while let Some(cid) = stack.pop() {
2641        if !seen.insert(cid.clone()) {
2642            continue;
2643        }
2644        if let Ok(data) = store.get_chunk(&cid) {
2645            objects.insert(cid, data.clone());
2646            if let Ok(commit) = metadata::deserialize::<Commit>(&data) {
2647                // Include key rotation records for this commit's key chain
2648                if let Some(kid) = &commit.key_id {
2649                    let keys_dir = shard_dir.join("keys");
2650                    if let Ok(chain) = keychain::collect_rotation_chain(&keys_dir, kid) {
2651                        for rot in &chain {
2652                            let rj = serde_json::to_vec(rot)?;
2653                            let rh = blake3::hash(&rj);
2654                            if !store.has_chunk(rh.to_hex().as_ref()) {
2655                                store.put_chunk(&crate::chunker::Chunk {
2656                                    hash: rh,
2657                                    data: rj.clone(),
2658                                    offset: 0,
2659                                })?;
2660                            }
2661                            objects.insert(rot.rotation_id.clone(), rj);
2662                        }
2663                    }
2664                }
2665                for mid in &commit.manifests {
2666                    if let Ok(manifest_data) = store.get_chunk(mid) {
2667                        objects.insert(mid.clone(), manifest_data.clone());
2668                        if let Ok(manifest) = metadata::deserialize::<FileManifest>(&manifest_data)
2669                        {
2670                            for cid in &manifest.chunks {
2671                                if let Ok(chunk_data) = store.get_chunk(cid) {
2672                                    objects.insert(cid.clone(), chunk_data);
2673                                }
2674                            }
2675                        }
2676                    }
2677                }
2678                for parent in &commit.parents {
2679                    stack.push(parent.clone());
2680                }
2681            }
2682        }
2683    }
2684
2685    if !json {
2686        info!(
2687            "Pushing {} objects ({} bytes)...",
2688            objects.len(),
2689            objects.values().map(|v| v.len() as u64).sum::<u64>()
2690        );
2691    }
2692
2693    // Connect and send all objects
2694    let mut node = shard_net::p2p::Node::new().await?;
2695    let multiaddr: shard_net::libp2p::Multiaddr = peer.parse()?;
2696    let peer_id = match multiaddr.iter().last() {
2697        Some(shard_net::libp2p::multiaddr::Protocol::P2p(peer_id)) => peer_id,
2698        _ => anyhow::bail!("Multiaddr must end with /p2p/<peer_id>"),
2699    };
2700
2701    for (id, data) in &objects {
2702        node.request_put_chunk(&multiaddr, peer_id, id.clone(), data.clone())
2703            .await?;
2704    }
2705
2706    if json {
2707        info!(
2708            "{}",
2709            serde_json::to_string(&serde_json::json!({
2710                "status": "push complete",
2711                "objects": objects.len(),
2712            }))?
2713        );
2714    } else {
2715        info!("Push complete ({} objects).", objects.len());
2716    }
2717    Ok(())
2718}
2719
2720/// Rotate the signing key: generates a new ed25519 keypair, archives the old
2721/// one, and persists a signed rotation record.
2722pub fn key_rotate(path: &Path) -> Result<()> {
2723    let shard_dir = path.join(".shard");
2724    if !shard_dir.exists() {
2725        anyhow::bail!("not a shard repository (run `shard init` first)");
2726    }
2727    let keys_dir = shard_dir.join("keys");
2728    let rotation = keychain::rotate_signing_key(&keys_dir)?;
2729
2730    // Also update global keyring
2731    if let Some(global) = global_keys_dir() {
2732        fs::create_dir_all(&global).ok();
2733        if let Ok(new_keys) = KeyPair::load(&keys_dir) {
2734            let _ = new_keys.save(&global);
2735        }
2736    }
2737
2738    // Store rotation record as a content-addressed chunk in the DAG
2739    let store = Store::open(&shard_dir)?;
2740    let json = serde_json::to_vec(&rotation)?;
2741    let hash = blake3::hash(&json);
2742    if !store.has_chunk(hash.to_hex().as_ref()) {
2743        store.put_chunk(&crate::chunker::Chunk {
2744            hash,
2745            data: json,
2746            offset: 0,
2747        })?;
2748    }
2749
2750    // Replicate all rotation records as chunks for P2P availability
2751    let rotations = keychain::load_rotations(&keys_dir)?;
2752    for rot in &rotations {
2753        let rj = serde_json::to_vec(rot)?;
2754        let rh = blake3::hash(&rj);
2755        if !store.has_chunk(rh.to_hex().as_ref()) {
2756            store.put_chunk(&crate::chunker::Chunk {
2757                hash: rh,
2758                data: rj,
2759                offset: 0,
2760            })?;
2761        }
2762    }
2763
2764    info!(
2765        "Key rotated: {} -> {}",
2766        rotation.old_key_id, rotation.new_key_id
2767    );
2768    info!("Rotation record: {} (stored in DAG)", rotation.rotation_id);
2769    Ok(())
2770}
2771
2772/// List all keys in the keychain with their validity info.
2773pub fn key_list(path: &Path, json: bool) -> Result<()> {
2774    let shard_dir = path.join(".shard");
2775    if !shard_dir.exists() {
2776        anyhow::bail!("not a shard repository (run `shard init` first)");
2777    }
2778    let keys_dir = shard_dir.join("keys");
2779    let records = keychain::load_records(&keys_dir)?;
2780    let current_id = keychain::get_current_key_id(&keys_dir)?;
2781
2782    if json {
2783        info!(
2784            "{}",
2785            serde_json::to_string_pretty(&serde_json::json!({
2786                "current": current_id,
2787                "records": &records,
2788            }))?
2789        );
2790    } else {
2791        info!("Current key: {}", current_id);
2792        info!("Key history:");
2793        for record in &records {
2794            let marker = if record.key_id == current_id {
2795                " (active)"
2796            } else {
2797                ""
2798            };
2799            info!(
2800                "  {}  created_at={}{}",
2801                record.key_id, record.created_at, marker
2802            );
2803            if let Some(prev) = &record.previous_key_id {
2804                info!("    previous: {}", prev);
2805            }
2806        }
2807    }
2808    Ok(())
2809}
2810
2811/// Verify the integrity of the keychain: check every rotation's signature.
2812pub fn key_verify(path: &Path, json: bool) -> Result<()> {
2813    let shard_dir = path.join(".shard");
2814    if !shard_dir.exists() {
2815        anyhow::bail!("not a shard repository (run `shard init` first)");
2816    }
2817    let keys_dir = shard_dir.join("keys");
2818    let errors = keychain::verify_keychain(&keys_dir)?;
2819
2820    if json {
2821        info!(
2822            "{}",
2823            serde_json::to_string(&serde_json::json!({
2824                "verified": errors.is_empty(),
2825                "errors": errors,
2826            }))?
2827        );
2828    } else {
2829        if errors.is_empty() {
2830            info!("Keychain verification successful.");
2831        } else {
2832            for err in &errors {
2833                error!("Keychain error: {}", err);
2834            }
2835            anyhow::bail!("Keychain verification failed ({} errors).", errors.len());
2836        }
2837    }
2838    Ok(())
2839}