Skip to main content

shard_core/
lib.rs

1pub mod branch;
2pub mod chunker;
3pub mod commit;
4pub mod compression;
5pub mod encryption;
6pub mod index;
7pub mod keychain;
8pub mod manifest;
9pub mod metadata;
10pub mod partial;
11pub mod store;
12pub mod wal;
13
14use crate::commit::Commit;
15use crate::compression::Compression;
16use crate::index::Index;
17use crate::keychain::KeyRotation;
18use crate::manifest::FileManifest;
19use crate::store::Store;
20use anyhow::Result;
21use ed25519_dalek::{Signer, Verifier};
22use metadata::MetadataFormat;
23use serde::Serialize;
24use shard_crypto::KeyPair;
25use shard_net::libp2p::futures::StreamExt;
26use similar::TextDiff;
27use std::collections::HashMap;
28use std::fs;
29use std::io::Write;
30use std::path::Path;
31use std::path::PathBuf;
32use std::time::{Duration, SystemTime, UNIX_EPOCH};
33use tracing::{error, info, warn};
34
35pub fn init(
36    path: &Path,
37    backend: &str,
38    compression_algo: &str,
39    chunker_mode: &str,
40    chunk_size: Option<u64>,
41    is_private: bool,
42    json: bool,
43) -> Result<()> {
44    let shard_dir = path.join(".shard");
45    if shard_dir.exists() {
46        anyhow::bail!(
47            "repository already initialized at {} (run `shard status` to confirm)",
48            shard_dir.display()
49        );
50    }
51    fs::create_dir_all(shard_dir.join("objects"))?;
52    fs::create_dir_all(shard_dir.join("keys"))?;
53    fs::create_dir_all(shard_dir.join("refs").join("heads"))?;
54    branch::set_head_branch(&shard_dir, "main")?;
55
56    let keys = KeyPair::generate();
57    keys.save(&shard_dir.join("keys"))?;
58    if let Some(global) = global_keys_dir() {
59        fs::create_dir_all(&global).ok();
60        let _ = keys.save(&global);
61        let _ = keychain::init_keychain(&global);
62    }
63    keychain::init_keychain(&shard_dir.join("keys"))?;
64
65    // Generate a deterministic repo identity from the public key
66    // (same key = same repo_id, so clones share the gossipsub topic)
67    let pubkey = fs::read(shard_dir.join("keys/public.key"))?;
68    let repo_id = blake3::hash(&pubkey).to_hex().to_string();
69    let mut config = load_config(&shard_dir)?;
70
71    if is_private {
72        let key = encryption::generate_repo_key();
73        encryption::save_repo_key(&shard_dir.join("keys"), &key)?;
74        config.insert("private".to_string(), "true".to_string());
75    }
76    config.insert("repo_id".to_string(), repo_id);
77    config.insert("storage_backend".to_string(), backend.to_string());
78    config.insert(
79        "serialization_format".to_string(),
80        MetadataFormat::Json.config_value().to_string(),
81    );
82    config.insert("compression".to_string(), compression_algo.to_string());
83    config.insert("chunker_mode".to_string(), chunker_mode.to_string());
84    match chunker_mode {
85        "rabin" => {
86            let chunk_size = chunk_size.unwrap_or(4_194_304);
87            let min = chunk_size / 4;
88            let max = chunk_size * 2;
89            config.insert("chunk_min".to_string(), min.to_string());
90            config.insert("chunk_avg".to_string(), chunk_size.to_string());
91            config.insert("chunk_max".to_string(), max.to_string());
92        }
93        _ => {
94            let cs = chunk_size.unwrap_or(4_194_304);
95            config.insert("chunk_size".to_string(), cs.to_string());
96        }
97    }
98    save_config(&shard_dir, &config)?;
99
100    let chunker_desc = if chunker_mode == "rabin" {
101        format!(
102            "rabin (avg {} bytes)",
103            config.get("chunk_avg").unwrap_or(&"4 MiB".to_string())
104        )
105    } else {
106        format!(
107            "fixed ({} bytes)",
108            config.get("chunk_size").unwrap_or(&"4 MiB".to_string())
109        )
110    };
111    if json {
112        info!(
113            "{}",
114            serde_json::to_string(&serde_json::json!({
115                "path": shard_dir.display().to_string(),
116                "backend": backend,
117                "compression": compression_algo,
118                "chunker": chunker_desc,
119                "private": is_private,
120            }))?
121        );
122    } else {
123        info!(
124            "Initialized empty Shard repository in {} with {} storage (compression: {}, chunking: {})",
125            shard_dir.display(),
126            backend,
127            compression_algo,
128            chunker_desc,
129        );
130    }
131    Ok(())
132}
133
134fn relative_path(repo_root: &Path, file_path: &Path) -> String {
135    let repo = repo_root
136        .canonicalize()
137        .unwrap_or_else(|_| repo_root.to_path_buf());
138    let file = file_path
139        .canonicalize()
140        .unwrap_or_else(|_| file_path.to_path_buf());
141    file.strip_prefix(&repo)
142        .map(|p| p.to_string_lossy().to_string())
143        .unwrap_or_else(|_| {
144            file_path
145                .file_name()
146                .map(|s| s.to_string_lossy().to_string())
147                .unwrap_or_default()
148        })
149}
150
151fn detect_content_type(file_path: &Path) -> Option<String> {
152    let ext = file_path.extension()?.to_str()?.to_lowercase();
153    let mime = match ext.as_str() {
154        "txt" => "text/plain",
155        "json" => "application/json",
156        "csv" => "text/csv",
157        "png" => "image/png",
158        "jpg" | "jpeg" => "image/jpeg",
159        "gif" => "image/gif",
160        "pdf" => "application/pdf",
161        "yaml" | "yml" => "application/x-yaml",
162        "md" => "text/markdown",
163        "html" | "htm" => "text/html",
164        "py" => "text/x-python",
165        "rs" => "text/x-rust",
166        "ts" => "text/x-typescript",
167        "js" => "application/javascript",
168        "wasm" => "application/wasm",
169        "toml" => "application/toml",
170        "xml" => "application/xml",
171        "zip" => "application/zip",
172        "tar" => "application/x-tar",
173        "gz" => "application/gzip",
174        "bin" => "application/octet-stream",
175        "pt" | "pth" | "ckpt" | "safetensors" => "application/x-model",
176        _ => return None,
177    };
178    Some(mime.to_string())
179}
180
181#[allow(clippy::too_many_arguments)]
182fn add_file(
183    repo_root: &Path,
184    file_path: &Path,
185    store: &Store,
186    index: &mut Index,
187    compression: &Compression,
188    chunker_mode: &chunker::ChunkerMode,
189    cipher: Option<&encryption::RepoCipher>,
190    _json: bool,
191) -> Result<()> {
192    let file = fs::File::open(file_path)?;
193    let mut chunker = match chunker_mode {
194        chunker::ChunkerMode::Fixed { chunk_size } => {
195            chunker::Chunker::new_fixed(Box::new(file), *chunk_size)
196        }
197        chunker::ChunkerMode::Rabin { min, avg, max } => {
198            chunker::Chunker::new_rabin(Box::new(file), *min, *avg, *max)
199        }
200    };
201    let mut chunk_hashes = Vec::new();
202    let mut total_size = 0;
203
204    while let Some(chunk) = chunker.next_chunk()? {
205        let hash = chunk.hash;
206        let compressed_data = compression.compress(&chunk.data)?;
207        let stored_data = match cipher {
208            Some(c) => c.encrypt(&compressed_data),
209            None => compressed_data,
210        };
211        let stored = crate::chunker::Chunk {
212            hash,
213            data: stored_data,
214            offset: chunk.offset,
215        };
216        store.put_chunk(&stored)?;
217        chunk_hashes.push(hash.to_hex().to_string());
218        total_size += chunk.data.len() as u64;
219    }
220
221    let name = relative_path(repo_root, file_path);
222    let manifest = FileManifest {
223        name: name.clone(),
224        size: total_size,
225        chunks: chunk_hashes.clone(),
226        content_type: detect_content_type(file_path),
227        compression: compression.as_str().to_string(),
228        merkle_root: Some(FileManifest::merkle_root(&chunk_hashes)),
229        created_by: None,
230        created_at: None,
231        signature: None,
232    };
233
234    index.files.insert(name.clone(), manifest);
235    if !_json {
236        info!("Added {} ({})", name, total_size);
237    }
238    Ok(())
239}
240
241pub fn recover(path: &Path, json: bool) -> Result<()> {
242    let shard_dir = path.join(".shard");
243    if !shard_dir.exists() {
244        anyhow::bail!("not a shard repository (run `shard init` first)");
245    }
246    wal::recover(&shard_dir)?;
247    if json {
248        info!(
249            "{}",
250            serde_json::to_string(&serde_json::json!({"status": "recovery complete"}))?
251        );
252    } else {
253        info!("Recovery complete.");
254    }
255    Ok(())
256}
257
258pub fn add(path: &Path, file_path: &Path, json: bool) -> Result<()> {
259    let shard_dir = path.join(".shard");
260    if !shard_dir.exists() {
261        anyhow::bail!("not a shard repository (run `shard init` first)");
262    }
263
264    wal::recover(&shard_dir)?;
265
266    let config = load_config(&shard_dir)?;
267    let compression: Compression = config
268        .get("compression")
269        .map(|s| s.as_str())
270        .unwrap_or("zstd")
271        .parse()?;
272
273    let chunker_mode = chunker::ChunkerMode::from_config(&config);
274    let fmt = MetadataFormat::from_config(&config);
275
276    let store = Store::open(&shard_dir)?;
277    let mut index = Index::load(&shard_dir.join("index"), &fmt)?;
278
279    let cipher = maybe_load_cipher(&shard_dir)?;
280
281    if file_path.is_dir() {
282        for entry in walkdir::WalkDir::new(file_path)
283            .into_iter()
284            .filter_entry(|e| {
285                e.file_name()
286                    .to_str()
287                    .map(|s| !s.starts_with('.'))
288                    .unwrap_or(false)
289            })
290        {
291            let entry = entry?;
292            if entry.file_type().is_file() {
293                add_file(
294                    path,
295                    entry.path(),
296                    &store,
297                    &mut index,
298                    &compression,
299                    &chunker_mode,
300                    cipher.as_ref(),
301                    json,
302                )?;
303            }
304        }
305    } else {
306        add_file(
307            path,
308            file_path,
309            &store,
310            &mut index,
311            &compression,
312            &chunker_mode,
313            cipher.as_ref(),
314            json,
315        )?;
316    }
317
318    index.save(&shard_dir.join("index"), &fmt)?;
319    if json {
320        info!(
321            "{}",
322            serde_json::to_string(&serde_json::json!({"status": "added"}))?
323        );
324    }
325    Ok(())
326}
327
328pub fn commit(path: &Path, message: &str, author: &str, json: bool) -> Result<()> {
329    let shard_dir = path.join(".shard");
330    if !shard_dir.exists() {
331        anyhow::bail!("not a shard repository (run `shard init` first)");
332    }
333
334    // Recover from any previous crash before mutating
335    wal::recover(&shard_dir)?;
336
337    let config = load_config(&shard_dir)?;
338    let fmt = MetadataFormat::from_config(&config);
339
340    let store = Store::open(&shard_dir)?;
341    let mut index = Index::load(&shard_dir.join("index"), &fmt)?;
342
343    if index.files.is_empty() {
344        anyhow::bail!("nothing to commit (stage files with `shard add` first)");
345    }
346
347    let head_path = shard_dir.join("HEAD");
348
349    // WAL: back up pre-commit state
350    let wal = wal::Wal::new(&shard_dir);
351    let head_backup = fs::read_to_string(&head_path).ok();
352    let index_backup = fs::read(shard_dir.join("index"))?;
353    wal.append(&wal::WalEntry::CommitBegin {
354        head_backup,
355        index_backup,
356    })?;
357
358    // 1. Store manifests (signed)
359    let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
360    let keys = load_keypair(&shard_dir)?;
361    let signing_key = keys.signing_key;
362    let mut manifest_ids = Vec::new();
363    for manifest in index.files.values_mut() {
364        manifest.created_by = Some(author.to_string());
365        manifest.created_at = Some(timestamp);
366
367        let mut unsigned = manifest.clone();
368        unsigned.signature = None;
369        let canonical = metadata::serialize_for_signing(&unsigned);
370        let sig = signing_key.sign(&canonical);
371        manifest.signature = Some(hex::encode(sig.to_bytes()));
372
373        let encoded = metadata::serialize(manifest, &fmt);
374        let hash = blake3::hash(&encoded);
375        let chunk = crate::chunker::Chunk {
376            hash,
377            data: encoded,
378            offset: 0,
379        };
380        store.put_chunk(&chunk)?;
381        manifest_ids.push(hash.to_hex().to_string());
382    }
383    manifest_ids.sort();
384
385    // 2. Get parent
386    let mut parents = Vec::new();
387    let (current_branch, parent_id) = branch::resolve_head(&shard_dir)?;
388    if let Some(pid) = parent_id {
389        parents.push(pid);
390    }
391
392    // 3. Create commit
393    let public_key_hex = hex::encode(keys.verifying_key.to_bytes());
394    let key_id = keychain::get_current_key_id(&shard_dir.join("keys")).ok();
395    let mut commit = Commit {
396        commit_id: String::new(),
397        parents,
398        manifests: manifest_ids,
399        author: author.to_string(),
400        message: message.to_string(),
401        timestamp,
402        public_key: Some(public_key_hex),
403        signature: None,
404        key_id,
405    };
406
407    // 4. Sign — always JSON for deterministic signature
408    let json_unsigned = metadata::serialize_for_signing(&commit);
409    let signature = signing_key.sign(&json_unsigned);
410    commit.signature = Some(hex::encode(signature.to_bytes()));
411
412    // 5. Store commit — use configured format
413    let encoded = metadata::serialize(&commit, &fmt);
414    let hash = blake3::hash(&encoded);
415    let chunk = crate::chunker::Chunk {
416        hash,
417        data: encoded,
418        offset: 0,
419    };
420    store.put_chunk(&chunk)?;
421
422    // 6. Cycle detection: verify no parent chain already contains this commit
423    let commit_id = hash.to_hex().to_string();
424    if has_dag_cycle(&store, &commit.parents, &commit_id)? {
425        anyhow::bail!(
426            "Cycle detected: commit {} is already an ancestor of one or more parents",
427            commit_id
428        );
429    }
430
431    // 7. Update HEAD and branch ref
432    if let Some(ref branch_name) = current_branch {
433        branch::update_branch_ref(&shard_dir, branch_name, &commit_id)?;
434        branch::set_head_branch(&shard_dir, branch_name)?;
435    } else {
436        fs::write(&head_path, &commit_id)?;
437    }
438
439    // 7. Clear index
440    index.files.clear();
441    index.save(&shard_dir.join("index"), &fmt)?;
442
443    // WAL: mark commit complete
444    wal.append(&wal::WalEntry::CommitEnd)?;
445    wal.truncate()?;
446
447    if json {
448        info!(
449            "{}",
450            serde_json::to_string(&serde_json::json!({
451                "commit_id": commit_id,
452                "message": message,
453            }))?
454        );
455    } else {
456        info!("Committed {} ({})", commit_id, message);
457    }
458    Ok(())
459}
460
461pub fn verify(path: &Path, commit_id: &str, json: bool) -> Result<()> {
462    let shard_dir = path.join(".shard");
463    if !shard_dir.exists() {
464        anyhow::bail!("not a shard repository (run `shard init` first)");
465    }
466
467    if commit_id.len() < 2 {
468        anyhow::bail!("invalid commit id (too short: need at least 2 characters)");
469    }
470    let store = Store::open(&shard_dir)?;
471    let cipher = maybe_load_cipher(&shard_dir)?;
472    let commit_data = store.get_chunk(commit_id)?;
473
474    // Self-verify: stored blob hash must equal commit_id (M5)
475    let stored_hash = blake3::hash(&commit_data);
476    if stored_hash.to_hex().to_string() != commit_id {
477        anyhow::bail!("commit object hash mismatch: stored content does not match its hash — data may be corrupted");
478    }
479
480    let commit: Commit = metadata::deserialize(&commit_data)?;
481
482    let mut sig_verified = false;
483    let mut files_checked = 0u64;
484
485    // Verify the signing key was valid at commit time
486    if let Some(kid) = &commit.key_id {
487        if let Err(e) = keychain::key_was_valid_at(&shard_dir.join("keys"), kid, commit.timestamp) {
488            anyhow::bail!("Keychain verification failed: {}", e);
489        } else if !json {
490            info!("Keychain: key {} was active at commit time.", kid);
491        }
492    }
493
494    if let Some(sig_hex) = &commit.signature {
495        let verifying_key = if let Some(pk_hex) = &commit.public_key {
496            let pk_bytes = hex::decode(pk_hex)?;
497            ed25519_dalek::VerifyingKey::from_bytes(pk_bytes.as_slice().try_into()?)?
498        } else {
499            let pub_bytes = load_public_key(&shard_dir)?;
500            ed25519_dalek::VerifyingKey::from_bytes(pub_bytes.as_slice().try_into()?)?
501        };
502
503        let mut unsigned_commit = commit.clone();
504        unsigned_commit.signature = None;
505        let json_unsigned = metadata::serialize_for_signing(&unsigned_commit);
506
507        let sig_bytes = hex::decode(sig_hex)?;
508        let signature = ed25519_dalek::Signature::from_bytes(sig_bytes.as_slice().try_into()?);
509
510        verifying_key.verify(&json_unsigned, &signature)?;
511        sig_verified = true;
512        if !json {
513            info!("Signature verified.");
514        }
515    } else if !json {
516        info!("Warning: Commit is unsigned.");
517    }
518
519    for manifest_id in &commit.manifests {
520        let manifest_data = store.get_chunk(manifest_id)?;
521        let hash = blake3::hash(&manifest_data);
522        if hash.to_hex().to_string() != *manifest_id {
523            anyhow::bail!("manifest object hash mismatch for manifest '{}': content does not match stored hash. The object store may be corrupted.", manifest_id);
524        }
525
526        let manifest: FileManifest = metadata::deserialize(&manifest_data)?;
527
528        // Verify manifest signature (defense-in-depth; commit signature already covers manifest_id)
529        if let Some(sig_hex) = &manifest.signature {
530            let pk_bytes = if let Some(pk_hex) = &commit.public_key {
531                hex::decode(pk_hex)?
532            } else {
533                load_public_key(&shard_dir)?
534            };
535            let vk = ed25519_dalek::VerifyingKey::from_bytes(pk_bytes.as_slice().try_into()?)?;
536            let mut unsigned = manifest.clone();
537            unsigned.signature = None;
538            let canonical = metadata::serialize_for_signing(&unsigned);
539            let sig_bytes = hex::decode(sig_hex)?;
540            let sig = ed25519_dalek::Signature::from_bytes(sig_bytes.as_slice().try_into()?);
541            vk.verify(&canonical, &sig)?;
542            if !json {
543                info!("  Manifest signature verified for: {}", manifest.name);
544            }
545        }
546
547        let compression = manifest.compression.parse::<Compression>()?;
548        if !json {
549            info!(
550                "Verifying file: {} (compression: {})",
551                manifest.name, manifest.compression
552            );
553        }
554
555        // Verify merkle_root if present
556        if let Some(ref mr) = manifest.merkle_root {
557            let computed = FileManifest::merkle_root(&manifest.chunks);
558            if mr != &computed {
559                anyhow::bail!(
560                    "merkle root mismatch for '{}': manifest says {} but computed {}",
561                    manifest.name,
562                    mr,
563                    computed
564                );
565            }
566        }
567
568        for chunk_id in &manifest.chunks {
569            let chunk_data = store.get_chunk(chunk_id)?;
570            let decrypted = match &cipher {
571                Some(c) => c.decrypt(&chunk_data)?,
572                None => chunk_data,
573            };
574            let decompressed = compression.decompress(&decrypted)?;
575            let hash = blake3::hash(&decompressed);
576            if hash.to_hex().to_string() != *chunk_id {
577                anyhow::bail!("chunk hash mismatch for '{}': content does not match stored hash (expected {}, got {}). File may be corrupted.", manifest.name, chunk_id, hash.to_hex());
578            }
579        }
580        files_checked += 1;
581    }
582
583    if json {
584        info!(
585            "{}",
586            serde_json::to_string(&serde_json::json!({
587                "commit_id": commit_id,
588                "verified": true,
589                "signature_verified": sig_verified,
590                "files_checked": files_checked,
591            }))?
592        );
593    } else {
594        info!("Verification successful.");
595    }
596    Ok(())
597}
598
599fn load_commit(store: &Store, commit_id: &str) -> Result<Commit> {
600    if commit_id.len() < 2 {
601        anyhow::bail!(
602            "commit id too short (got {} chars, need at least 2): '{}'",
603            commit_id.len(),
604            commit_id
605        );
606    };
607    let data = store.get_chunk(commit_id)?;
608    let mut commit: Commit = metadata::deserialize(&data)?;
609    commit.commit_id = commit_id.to_string();
610    Ok(commit)
611}
612
613fn has_dag_cycle(store: &Store, parents: &[String], commit_id: &str) -> Result<bool> {
614    let mut seen = std::collections::HashSet::new();
615    let mut stack: Vec<String> = parents.to_vec();
616    while let Some(cid) = stack.pop() {
617        if cid == commit_id {
618            return Ok(true);
619        }
620        if !seen.insert(cid.clone()) {
621            continue;
622        }
623        if let Ok(data) = store.get_chunk(&cid) {
624            if let Ok(commit) = metadata::deserialize::<Commit>(&data) {
625                for p in &commit.parents {
626                    stack.push(p.clone());
627                }
628            }
629        }
630    }
631    Ok(false)
632}
633
634#[derive(Serialize)]
635pub struct LogEntry {
636    pub commit_id: String,
637    pub parents: Vec<String>,
638    pub manifests: Vec<String>,
639    pub author: String,
640    pub message: String,
641    pub timestamp: u64,
642    pub signature: Option<String>,
643}
644
645impl From<Commit> for LogEntry {
646    fn from(c: Commit) -> Self {
647        LogEntry {
648            commit_id: c.commit_id,
649            parents: c.parents,
650            manifests: c.manifests,
651            author: c.author,
652            message: c.message,
653            timestamp: c.timestamp,
654            signature: c.signature,
655        }
656    }
657}
658
659pub fn log_cmd(path: &Path, json: bool) -> Result<()> {
660    let shard_dir = path.join(".shard");
661    if !shard_dir.exists() {
662        anyhow::bail!("not a shard repository (run `shard init` first)");
663    }
664
665    let store = Store::open(&shard_dir)?;
666
667    let (_, head_commit) = branch::resolve_head(&shard_dir)?;
668    let head = head_commit
669        .ok_or_else(|| anyhow::anyhow!("no commits yet (run `shard commit` after adding files)"))?;
670
671    let mut entries: Vec<LogEntry> = Vec::new();
672    let mut seen = std::collections::HashSet::new();
673    let mut stack = vec![head];
674
675    while let Some(cid) = stack.pop() {
676        if !seen.insert(cid.clone()) {
677            continue;
678        }
679        let commit = load_commit(&store, &cid)?;
680        for parent in &commit.parents {
681            stack.push(parent.clone());
682        }
683        entries.push(commit.into());
684    }
685
686    if json {
687        info!("{}", serde_json::to_string_pretty(&entries)?);
688    } else {
689        for entry in &entries {
690            let ts = {
691                let secs = entry.timestamp as i64;
692                let tm = time::OffsetDateTime::from_unix_timestamp(secs)
693                    .unwrap_or(time::OffsetDateTime::UNIX_EPOCH);
694                tm.format(&time::format_description::well_known::Rfc3339)
695                    .unwrap_or_else(|_| entry.timestamp.to_string())
696            };
697            info!("commit {}", entry.commit_id);
698            if !entry.parents.is_empty() {
699                info!("parents: {}", entry.parents.join(" "));
700            }
701            info!("author: {}", entry.author);
702            info!("date:   {}", ts);
703            info!("");
704            for line in entry.message.lines() {
705                info!("    {}", line);
706            }
707            info!("");
708        }
709    }
710
711    Ok(())
712}
713
714fn reconstruct_file(
715    store: &Store,
716    manifest: &FileManifest,
717    cipher: Option<&encryption::RepoCipher>,
718) -> Result<Vec<u8>> {
719    let compression: Compression = manifest.compression.parse()?;
720    let mut data = Vec::new();
721    for chunk_id in &manifest.chunks {
722        let chunk_data = store.get_chunk(chunk_id)?;
723        let decrypted = match cipher {
724            Some(c) => c.decrypt(&chunk_data)?,
725            None => chunk_data,
726        };
727        let decompressed = compression.decompress(&decrypted)?;
728        data.extend_from_slice(&decompressed);
729    }
730    Ok(data)
731}
732
733pub fn diff(path: &Path, commit_a: &str, commit_b: Option<&str>, json: bool) -> Result<()> {
734    let shard_dir = path.join(".shard");
735    if !shard_dir.exists() {
736        anyhow::bail!("not a shard repository (run `shard init` first)");
737    }
738
739    let store = Store::open(&shard_dir)?;
740    let cipher = maybe_load_cipher(&shard_dir)?;
741
742    let cid_b = match commit_b {
743        Some(c) => branch::resolve_rev(&shard_dir, c)?,
744        None => {
745            let (_, head) = branch::resolve_head(&shard_dir)?;
746            head.ok_or_else(|| anyhow::anyhow!("no commits yet"))?
747        }
748    };
749    let cid_a = branch::resolve_rev(&shard_dir, commit_a)?;
750
751    let commit1 = load_commit(&store, &cid_a)?;
752    let commit2 = load_commit(&store, &cid_b)?;
753
754    let mut files1: HashMap<String, FileManifest> = HashMap::new();
755    for mid in &commit1.manifests {
756        let data = store.get_chunk(mid)?;
757        let m: FileManifest = metadata::deserialize(&data)?;
758        files1.insert(m.name.clone(), m);
759    }
760
761    let mut files2: HashMap<String, FileManifest> = HashMap::new();
762    for mid in &commit2.manifests {
763        let data = store.get_chunk(mid)?;
764        let m: FileManifest = metadata::deserialize(&data)?;
765        files2.insert(m.name.clone(), m);
766    }
767
768    let mut all_names: Vec<&String> = files1.keys().chain(files2.keys()).collect();
769    all_names.sort();
770    all_names.dedup();
771
772    let mut changes: Vec<serde_json::Value> = Vec::new();
773    let mut diff_found = false;
774
775    for name in all_names {
776        match (files1.get(name), files2.get(name)) {
777            (None, Some(manifest)) => {
778                let content = reconstruct_file(&store, manifest, cipher.as_ref())?;
779                let text = String::from_utf8_lossy(&content);
780                diff_found = true;
781                if json {
782                    changes.push(serde_json::json!({
783                        "type": "added",
784                        "file": name,
785                        "lines": text.lines().collect::<Vec<_>>(),
786                    }));
787                } else {
788                    info!("--- /dev/null");
789                    info!("+++ b/{}", name);
790                    let lines: Vec<&str> = text.lines().collect();
791                    info!("@@ -0,0 +1,{} @@", lines.len());
792                    for line in &lines {
793                        info!("+{}", line);
794                    }
795                }
796            }
797            (Some(manifest), None) => {
798                let content = reconstruct_file(&store, manifest, cipher.as_ref())?;
799                let text = String::from_utf8_lossy(&content);
800                diff_found = true;
801                if json {
802                    changes.push(serde_json::json!({
803                        "type": "removed",
804                        "file": name,
805                        "lines": text.lines().collect::<Vec<_>>(),
806                    }));
807                } else {
808                    info!("--- a/{}", name);
809                    info!("+++ /dev/null");
810                    let lines: Vec<&str> = text.lines().collect();
811                    info!("@@ -1,{} +0,0 @@", lines.len());
812                    for line in &lines {
813                        info!("-{}", line);
814                    }
815                }
816            }
817            (Some(ma), Some(mb)) => {
818                if ma.chunks == mb.chunks {
819                    continue;
820                }
821                let content_a = reconstruct_file(&store, ma, cipher.as_ref())?;
822                let content_b = reconstruct_file(&store, mb, cipher.as_ref())?;
823                diff_found = true;
824                if json {
825                    let text_a = String::from_utf8_lossy(&content_a);
826                    let text_b = String::from_utf8_lossy(&content_b);
827                    changes.push(serde_json::json!({
828                        "type": "modified",
829                        "file": name,
830                        "old_lines": text_a.lines().collect::<Vec<_>>(),
831                        "new_lines": text_b.lines().collect::<Vec<_>>(),
832                    }));
833                } else {
834                    let text_a = String::from_utf8_lossy(&content_a);
835                    let text_b = String::from_utf8_lossy(&content_b);
836                    let diff = TextDiff::from_lines(text_a.as_ref(), text_b.as_ref());
837                    let mut buf: Vec<u8> = Vec::new();
838                    diff.unified_diff()
839                        .header(&format!("a/{}", name), &format!("b/{}", name))
840                        .to_writer(&mut buf)
841                        .map_err(|e| anyhow::anyhow!("diff output error: {}", e))?;
842                    let output = String::from_utf8_lossy(&buf);
843                    for line in output.lines() {
844                        info!("{}", line);
845                    }
846                }
847            }
848            (None, None) => {}
849        }
850    }
851
852    if !diff_found {
853        if json {
854            info!(
855                "{}",
856                serde_json::to_string(
857                    &serde_json::json!({"changes": changes, "message": "no differences"})
858                )?
859            );
860        } else {
861            info!("No differences between the commits.");
862        }
863        return Ok(());
864    }
865
866    if json {
867        info!(
868            "{}",
869            serde_json::to_string(&serde_json::json!({"changes": changes}))?
870        );
871    }
872
873    Ok(())
874}
875
876pub fn checkout(path: &Path, target: &str, json: bool) -> Result<()> {
877    let shard_dir = path.join(".shard");
878    if !shard_dir.exists() {
879        anyhow::bail!("not a shard repository (run `shard init` first)");
880    }
881
882    let store = Store::open(&shard_dir)?;
883    let cipher = maybe_load_cipher(&shard_dir)?;
884
885    // Resolve target: branch name or commit id
886    // Validate BEFORE writing to HEAD to avoid corruption on failure
887    let branch_path = shard_dir.join("refs").join("heads").join(target);
888    let commit_id = if branch_path.exists() {
889        fs::read_to_string(&branch_path)?.trim().to_string()
890    } else {
891        target.to_string()
892    };
893
894    // Validate commit exists before touching HEAD
895    let commit = load_commit(&store, &commit_id)?;
896
897    // Only update HEAD after validation succeeds
898    if branch_path.exists() {
899        branch::set_head_branch(&shard_dir, target)?;
900    } else {
901        branch::set_head_commit(&shard_dir, target)?;
902    }
903    let mut files = Vec::new();
904
905    for manifest_id in &commit.manifests {
906        let data = store.get_chunk(manifest_id)?;
907        let hash = blake3::hash(&data);
908        if hash.to_hex().to_string() != *manifest_id {
909            anyhow::bail!("Manifest hash mismatch: {}", manifest_id);
910        }
911        let manifest: FileManifest = metadata::deserialize(&data)?;
912        let compression = manifest.compression.parse::<Compression>()?;
913        if !json {
914            info!(
915                "Checking out file: {} (compression: {})",
916                manifest.name, manifest.compression
917            );
918        }
919
920        let mut file_data = Vec::new();
921        for chunk_id in &manifest.chunks {
922            let chunk_data = store.get_chunk(chunk_id)?;
923            let decrypted = match &cipher {
924                Some(c) => c.decrypt(&chunk_data)?,
925                None => chunk_data,
926            };
927            let decompressed = compression.decompress(&decrypted)?;
928            file_data.extend_from_slice(&decompressed);
929        }
930        fs::write(path.join(&manifest.name), file_data)?;
931        if !json {
932            info!("  -> {}", manifest.name);
933        }
934        files.push(manifest.name);
935    }
936
937    if json {
938        info!(
939            "{}",
940            serde_json::to_string(&serde_json::json!({
941                "commit_id": commit_id,
942                "files": files,
943            }))?
944        );
945    } else {
946        info!("Checkout complete.");
947    }
948    Ok(())
949}
950
951pub fn status(path: &Path, json: bool) -> Result<()> {
952    let shard_dir = path.join(".shard");
953    if !shard_dir.exists() {
954        anyhow::bail!("not a shard repository (run `shard init` first)");
955    }
956
957    let config = load_config(&shard_dir)?;
958    let fmt = MetadataFormat::from_config(&config);
959
960    let (current_branch, head_commit) = branch::resolve_head(&shard_dir)?;
961    let mut commit_id: Option<String> = None;
962    if let Some(cid) = head_commit {
963        commit_id = Some(cid);
964        if !json {
965            if let Some(ref branch) = current_branch {
966                info!("On branch: {}", branch);
967            } else {
968                info!("HEAD detached at {}", commit_id.as_ref().unwrap());
969            }
970        }
971    } else if !json {
972        info!("No commits yet.");
973    }
974
975    let index = Index::load(&shard_dir.join("index"), &fmt)?;
976    let staged: Vec<String> = index.files.keys().cloned().collect();
977    if !json {
978        if staged.is_empty() {
979            info!("Nothing staged.");
980        } else {
981            info!("\nStaged files:");
982            for name in &staged {
983                info!("  {} (to be committed)", name);
984            }
985        }
986    }
987
988    let store = Store::open(&shard_dir)?;
989    let mut deleted = Vec::new();
990    let tracked_names: std::collections::HashSet<String> = if let Some(head) = &commit_id {
991        let mut names = std::collections::HashSet::new();
992        if let Ok(commit) = load_commit(&store, head) {
993            for manifest_id in &commit.manifests {
994                if let Ok(data) = store.get_chunk(manifest_id) {
995                    if let Ok(manifest) = metadata::deserialize::<FileManifest>(&data) {
996                        let file_path = path.join(&manifest.name);
997                        if !file_path.exists() {
998                            deleted.push(manifest.name.clone());
999                        }
1000                        names.insert(manifest.name);
1001                    }
1002                }
1003            }
1004        }
1005        names
1006    } else {
1007        std::collections::HashSet::new()
1008    };
1009
1010    if !json && !deleted.is_empty() {
1011        info!("\nDeleted files:");
1012        for name in &deleted {
1013            info!("  {} (deleted)", name);
1014        }
1015    }
1016
1017    let mut untracked = Vec::new();
1018    for entry in walkdir::WalkDir::new(path)
1019        .min_depth(1)
1020        .into_iter()
1021        .filter_map(|e| e.ok())
1022    {
1023        if entry.file_type().is_file() || entry.file_type().is_dir() {
1024            let rel_path = entry.path().strip_prefix(path).unwrap_or(entry.path());
1025            let name = rel_path.to_string_lossy().to_string();
1026            let is_hidden = rel_path
1027                .components()
1028                .any(|c| c.as_os_str().to_string_lossy().starts_with('.'));
1029            if !is_hidden
1030                && entry.path() != shard_dir
1031                && !entry.path().starts_with(&shard_dir)
1032                && !index.files.contains_key(&name)
1033                && !tracked_names.contains(&name)
1034                && entry.file_type().is_file()
1035            {
1036                untracked.push(name);
1037            }
1038        }
1039    }
1040    if !json && !untracked.is_empty() {
1041        info!("\nUntracked files:");
1042        for name in &untracked {
1043            info!("  {}", name);
1044        }
1045    }
1046
1047    if json {
1048        info!(
1049            "{}",
1050            serde_json::to_string(&serde_json::json!({
1051                "commit": commit_id,
1052                "staged": staged,
1053                "deleted": deleted,
1054                "untracked": untracked,
1055            }))?
1056        );
1057    }
1058
1059    Ok(())
1060}
1061
1062fn load_config(shard_dir: &Path) -> Result<std::collections::BTreeMap<String, String>> {
1063    let config_path = shard_dir.join("config.json");
1064    if config_path.exists() {
1065        let data = fs::read(&config_path)?;
1066        Ok(metadata::deserialize(&data)?)
1067    } else {
1068        Ok(std::collections::BTreeMap::new())
1069    }
1070}
1071
1072fn save_config(
1073    shard_dir: &Path,
1074    config: &std::collections::BTreeMap<String, String>,
1075) -> Result<()> {
1076    let fmt = MetadataFormat::from_config(config);
1077    let data = metadata::serialize(config, &fmt);
1078    fs::write(shard_dir.join("config.json"), data)?;
1079    Ok(())
1080}
1081
1082fn global_keys_dir() -> Option<PathBuf> {
1083    std::env::var("HOME")
1084        .ok()
1085        .map(|h| Path::new(&h).join(".shard").join("keys"))
1086}
1087
1088fn load_keypair(shard_dir: &Path) -> Result<KeyPair> {
1089    let local = shard_dir.join("keys");
1090    if local.join("secret.key").exists() {
1091        return KeyPair::load(&local);
1092    }
1093    if let Some(global) = global_keys_dir() {
1094        if global.join("secret.key").exists() {
1095            return KeyPair::load(&global);
1096        }
1097    }
1098    anyhow::bail!("no keypair found in {} or ~/.shard/keys/", local.display())
1099}
1100
1101fn load_public_key(shard_dir: &Path) -> Result<Vec<u8>> {
1102    let local = shard_dir.join("keys/public.key");
1103    if local.exists() {
1104        return Ok(fs::read(&local)?);
1105    }
1106    if let Some(global) = global_keys_dir() {
1107        let gp = global.join("public.key");
1108        if gp.exists() {
1109            return Ok(fs::read(&gp)?);
1110        }
1111    }
1112    anyhow::bail!(
1113        "no public key found in {} or ~/.shard/keys/",
1114        local.display()
1115    )
1116}
1117
1118fn maybe_load_cipher(shard_dir: &Path) -> Result<Option<encryption::RepoCipher>> {
1119    let config = load_config(shard_dir)?;
1120    if config.get("private").map(|s| s.as_str()) == Some("true") {
1121        let key = encryption::load_repo_key(&shard_dir.join("keys"))?;
1122        Ok(Some(encryption::RepoCipher::from_key(&key)))
1123    } else {
1124        Ok(None)
1125    }
1126}
1127
1128pub fn config_get(path: &Path, key: Option<&str>) -> Result<()> {
1129    let shard_dir = path.join(".shard");
1130    if !shard_dir.exists() {
1131        anyhow::bail!("not a shard repository (run `shard init` first)");
1132    }
1133    let config = load_config(&shard_dir)?;
1134    if let Some(key) = key {
1135        match config.get(key) {
1136            Some(value) => info!("{} = {}", key, value),
1137            None => anyhow::bail!("config key not found: {}", key),
1138        }
1139    } else {
1140        for (k, v) in &config {
1141            info!("{} = {}", k, v);
1142        }
1143    }
1144    Ok(())
1145}
1146
1147pub fn config_set(path: &Path, key: &str, value: &str) -> Result<()> {
1148    let shard_dir = path.join(".shard");
1149    if !shard_dir.exists() {
1150        anyhow::bail!("not a shard repository (run `shard init` first)");
1151    }
1152    let mut config = load_config(&shard_dir)?;
1153    config.insert(key.to_string(), value.to_string());
1154    save_config(&shard_dir, &config)?;
1155    info!("{} = {}", key, value);
1156    Ok(())
1157}
1158
1159fn load_tags(shard_dir: &Path) -> Result<std::collections::BTreeMap<String, String>> {
1160    let tags_path = shard_dir.join("tags.json");
1161    if tags_path.exists() {
1162        let data = fs::read(&tags_path)?;
1163        Ok(serde_json::from_slice(&data)?)
1164    } else {
1165        Ok(std::collections::BTreeMap::new())
1166    }
1167}
1168
1169fn save_tags(shard_dir: &Path, tags: &std::collections::BTreeMap<String, String>) -> Result<()> {
1170    let data = serde_json::to_string_pretty(tags)?;
1171    fs::write(shard_dir.join("tags.json"), data)?;
1172    Ok(())
1173}
1174
1175pub fn tag_add(path: &Path, name: &str, commit_id: &str) -> Result<()> {
1176    let shard_dir = path.join(".shard");
1177    if !shard_dir.exists() {
1178        anyhow::bail!("not a shard repository (run `shard init` first)");
1179    }
1180    // Verify commit exists
1181    let store = Store::open(&shard_dir)?;
1182    load_commit(&store, commit_id)?;
1183    let mut tags = load_tags(&shard_dir)?;
1184    tags.insert(name.to_string(), commit_id.to_string());
1185    save_tags(&shard_dir, &tags)?;
1186    info!("Tagged '{}' -> {}", name, commit_id);
1187    Ok(())
1188}
1189
1190pub fn branch_create(path: &Path, name: &str, commit_id: Option<&str>) -> Result<()> {
1191    let shard_dir = path.join(".shard");
1192    if !shard_dir.exists() {
1193        anyhow::bail!("not a shard repository (run `shard init` first)");
1194    }
1195    let id = match commit_id {
1196        Some(cid) => cid.to_string(),
1197        None => {
1198            let (_, head) = branch::resolve_head(&shard_dir)?;
1199            head.ok_or_else(|| anyhow::anyhow!("No commits yet — cannot create branch"))?
1200        }
1201    };
1202    // Verify commit exists
1203    let store = Store::open(&shard_dir)?;
1204    load_commit(&store, &id)?;
1205    branch::create_branch(&shard_dir, name, &id)
1206}
1207
1208pub fn branch_delete(path: &Path, name: &str) -> Result<()> {
1209    let shard_dir = path.join(".shard");
1210    if !shard_dir.exists() {
1211        anyhow::bail!("not a shard repository (run `shard init` first)");
1212    }
1213    branch::delete_branch(&shard_dir, name)
1214}
1215
1216pub fn branch_list(path: &Path) -> Result<()> {
1217    let shard_dir = path.join(".shard");
1218    if !shard_dir.exists() {
1219        anyhow::bail!("not a shard repository (run `shard init` first)");
1220    }
1221    let (current, branches) = branch::list_branches(&shard_dir)?;
1222    if branches.is_empty() {
1223        info!("No branches.");
1224        return Ok(());
1225    }
1226    for (name, commit_id) in &branches {
1227        let prefix = if current.as_deref() == Some(name) {
1228            "* "
1229        } else {
1230            "  "
1231        };
1232        info!(
1233            "{}{} ({})",
1234            prefix,
1235            name,
1236            &commit_id[..8.min(commit_id.len())]
1237        );
1238    }
1239    Ok(())
1240}
1241
1242pub fn merge(path: &Path, branch: &str, message: &str, author: &str, json: bool) -> Result<()> {
1243    let shard_dir = path.join(".shard");
1244    if !shard_dir.exists() {
1245        anyhow::bail!("not a shard repository (run `shard init` first)");
1246    }
1247
1248    let store = Store::open(&shard_dir)?;
1249
1250    let config = load_config(&shard_dir)?;
1251    let fmt = MetadataFormat::from_config(&config);
1252
1253    // Resolve current HEAD
1254    let (current_branch, current_id) = branch::resolve_head(&shard_dir)?;
1255    let current_id =
1256        current_id.ok_or_else(|| anyhow::anyhow!("No commits yet — nothing to merge into"))?;
1257
1258    // Resolve source branch
1259    let source_id = branch::resolve_rev(&shard_dir, branch)?;
1260    if source_id == current_id {
1261        anyhow::bail!("Already up to date — source is the same commit as HEAD");
1262    }
1263
1264    // Load both commits
1265    let current_commit = load_commit(&store, &current_id)?;
1266    let source_commit = load_commit(&store, &source_id)?;
1267
1268    // Load manifests from both sides
1269    let mut merged_manifests: std::collections::HashMap<String, (String, Vec<String>)> =
1270        std::collections::HashMap::new();
1271
1272    for manifest_id in &current_commit.manifests {
1273        let data = store.get_chunk(manifest_id)?;
1274        let manifest: FileManifest = metadata::deserialize(&data)?;
1275        merged_manifests.insert(manifest.name.clone(), (manifest.name, manifest.chunks));
1276    }
1277
1278    for manifest_id in &source_commit.manifests {
1279        let data = store.get_chunk(manifest_id)?;
1280        let manifest: FileManifest = metadata::deserialize(&data)?;
1281        merged_manifests.insert(manifest.name.clone(), (manifest.name, manifest.chunks));
1282    }
1283
1284    // Store merged manifests (signed)
1285    let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
1286    let keys = load_keypair(&shard_dir)?;
1287    let signing_key = keys.signing_key;
1288    let mut merged_manifest_ids = Vec::new();
1289    for (name, chunks) in merged_manifests.values() {
1290        let compression = Compression::None;
1291        let mut manifest = FileManifest {
1292            name: name.clone(),
1293            size: 0,
1294            chunks: chunks.clone(),
1295            content_type: None,
1296            compression: compression.as_str().to_string(),
1297            merkle_root: Some(FileManifest::merkle_root(chunks)),
1298            created_by: Some(author.to_string()),
1299            created_at: Some(timestamp),
1300            signature: None,
1301        };
1302
1303        let mut unsigned = manifest.clone();
1304        unsigned.signature = None;
1305        let canonical = metadata::serialize_for_signing(&unsigned);
1306        let sig = signing_key.sign(&canonical);
1307        manifest.signature = Some(hex::encode(sig.to_bytes()));
1308
1309        let encoded = metadata::serialize(&manifest, &fmt);
1310        let hash = blake3::hash(&encoded);
1311        store.put_chunk(&crate::chunker::Chunk {
1312            hash,
1313            data: encoded,
1314            offset: 0,
1315        })?;
1316        merged_manifest_ids.push(hash.to_hex().to_string());
1317    }
1318    merged_manifest_ids.sort();
1319
1320    // Create merge commit
1321    let public_key_hex = hex::encode(keys.verifying_key.to_bytes());
1322    let key_id = keychain::get_current_key_id(&shard_dir.join("keys")).ok();
1323    let parents = vec![current_id.clone(), source_id.clone()];
1324    let mut commit = Commit {
1325        commit_id: String::new(),
1326        parents,
1327        manifests: merged_manifest_ids,
1328        author: author.to_string(),
1329        message: message.to_string(),
1330        timestamp,
1331        public_key: Some(public_key_hex),
1332        signature: None,
1333        key_id,
1334    };
1335
1336    let json_unsigned = metadata::serialize_for_signing(&commit);
1337    let signature = signing_key.sign(&json_unsigned);
1338    commit.signature = Some(hex::encode(signature.to_bytes()));
1339
1340    let encoded = metadata::serialize(&commit, &fmt);
1341    let hash = blake3::hash(&encoded);
1342    store.put_chunk(&crate::chunker::Chunk {
1343        hash,
1344        data: encoded,
1345        offset: 0,
1346    })?;
1347
1348    let merge_commit_id = hash.to_hex().to_string();
1349
1350    // Cycle detection on merge
1351    if has_dag_cycle(&store, &commit.parents, &merge_commit_id)? {
1352        anyhow::bail!(
1353            "Cycle detected in merge: commit {} is already an ancestor of one or more parents",
1354            merge_commit_id
1355        );
1356    }
1357
1358    // Update HEAD and branch ref
1359    if let Some(ref branch_name) = current_branch {
1360        branch::update_branch_ref(&shard_dir, branch_name, &merge_commit_id)?;
1361        branch::set_head_branch(&shard_dir, branch_name)?;
1362    } else {
1363        branch::set_head_commit(&shard_dir, &merge_commit_id)?;
1364    }
1365
1366    if json {
1367        info!(
1368            "{}",
1369            serde_json::to_string(&serde_json::json!({
1370                "commit_id": merge_commit_id,
1371                "message": message,
1372            }))?
1373        );
1374    } else {
1375        info!("Merge commit {} ({})", merge_commit_id, message);
1376    }
1377    Ok(())
1378}
1379
1380pub fn tag_list(path: &Path) -> Result<()> {
1381    let shard_dir = path.join(".shard");
1382    if !shard_dir.exists() {
1383        anyhow::bail!("not a shard repository (run `shard init` first)");
1384    }
1385    let tags = load_tags(&shard_dir)?;
1386    if tags.is_empty() {
1387        info!("No tags.");
1388    } else {
1389        for (name, commit_id) in &tags {
1390            info!("{} -> {}", name, commit_id);
1391        }
1392    }
1393    Ok(())
1394}
1395
1396fn collect_reachable(
1397    store: &Store,
1398    commit_id: &str,
1399    seen_commits: &mut std::collections::HashSet<String>,
1400    reachable: &mut std::collections::HashSet<String>,
1401) -> Result<()> {
1402    if !seen_commits.insert(commit_id.to_string()) {
1403        return Ok(());
1404    }
1405
1406    reachable.insert(commit_id.to_string());
1407
1408    let commit = match load_commit(store, commit_id) {
1409        Ok(c) => c,
1410        Err(_) => return Ok(()),
1411    };
1412
1413    for manifest_id in &commit.manifests {
1414        reachable.insert(manifest_id.clone());
1415
1416        if let Ok(data) = store.get_chunk(manifest_id) {
1417            if let Ok(manifest) = metadata::deserialize::<FileManifest>(&data) {
1418                for chunk_id in &manifest.chunks {
1419                    reachable.insert(chunk_id.clone());
1420                }
1421            }
1422        }
1423    }
1424
1425    for parent_id in &commit.parents {
1426        collect_reachable(store, parent_id, seen_commits, reachable)?;
1427    }
1428
1429    Ok(())
1430}
1431
1432pub fn prune(path: &Path, json: bool) -> Result<()> {
1433    let shard_dir = path.join(".shard");
1434    if !shard_dir.exists() {
1435        anyhow::bail!("not a shard repository (run `shard init` first)");
1436    }
1437
1438    let config = load_config(&shard_dir)?;
1439    let fmt = MetadataFormat::from_config(&config);
1440
1441    let store = Store::open(&shard_dir)?;
1442    let mut reachable: std::collections::HashSet<String> = std::collections::HashSet::new();
1443
1444    // 1. Walk from HEAD commit (and all branch tips)
1445    let (_, head_commit) = branch::resolve_head(&shard_dir)?;
1446    if let Some(ref head) = head_commit {
1447        collect_reachable(
1448            &store,
1449            head,
1450            &mut std::collections::HashSet::new(),
1451            &mut reachable,
1452        )?;
1453    }
1454
1455    // Also walk from all branch refs (in case HEAD is detached from any branch)
1456    if let Ok(branches) = branch::list_branches(&shard_dir) {
1457        for (_, commit_id) in branches.1 {
1458            collect_reachable(
1459                &store,
1460                &commit_id,
1461                &mut std::collections::HashSet::new(),
1462                &mut reachable,
1463            )?;
1464        }
1465    }
1466
1467    // 2. Walk from tags
1468    let tags = load_tags(&shard_dir)?;
1469    for commit_id in tags.values() {
1470        collect_reachable(
1471            &store,
1472            commit_id,
1473            &mut std::collections::HashSet::new(),
1474            &mut reachable,
1475        )?;
1476    }
1477
1478    // 3. Walk from index (staged files)
1479    let index = Index::load(&shard_dir.join("index"), &fmt)?;
1480    for manifest in index.files.values() {
1481        let json = metadata::serialize(manifest, &fmt);
1482        let hash = blake3::hash(&json);
1483        let hash_hex = hash.to_hex().to_string();
1484        reachable.insert(hash_hex);
1485        for chunk_hash in &manifest.chunks {
1486            reachable.insert(chunk_hash.clone());
1487        }
1488    }
1489
1490    // 4. Scan objects and remove unreachable
1491    let all_chunks = store.iter_chunks()?;
1492    let mut pruned = 0u64;
1493    let mut kept = 0u64;
1494    for (hash_hex, full_path) in &all_chunks {
1495        if !reachable.contains(hash_hex) {
1496            store.delete_chunk(hash_hex, Some(full_path))?;
1497            pruned += 1;
1498        } else {
1499            kept += 1;
1500        }
1501    }
1502
1503    if json {
1504        info!(
1505            "{}",
1506            serde_json::to_string(&serde_json::json!({
1507                "pruned": pruned,
1508                "remaining": kept,
1509            }))?
1510        );
1511    } else {
1512        info!("Pruned {} objects. {} objects remain.", pruned, kept);
1513    }
1514    Ok(())
1515}
1516
1517pub fn peer_add(path: &Path, multiaddr: &str, json: bool) -> Result<()> {
1518    let shard_dir = path.join(".shard");
1519    if !shard_dir.exists() {
1520        anyhow::bail!("not a shard repository (run `shard init` first)");
1521    }
1522
1523    // Validate multiaddr format
1524    if multiaddr.is_empty() || multiaddr.parse::<shard_net::libp2p::Multiaddr>().is_err() {
1525        anyhow::bail!("invalid multiaddr '{}' (must be a valid libp2p multiaddr, e.g. /ip4/1.2.3.4/tcp/5678/p2p/...)", multiaddr);
1526    }
1527
1528    let peers_path = shard_dir.join("peers.json");
1529    let mut peers: Vec<String> = if peers_path.exists() {
1530        let data = fs::read(&peers_path)?;
1531        serde_json::from_slice(&data)?
1532    } else {
1533        Vec::new()
1534    };
1535
1536    let added = if !peers.contains(&multiaddr.to_string()) {
1537        peers.push(multiaddr.to_string());
1538        let data = serde_json::to_vec(&peers)?;
1539        fs::write(peers_path, data)?;
1540        true
1541    } else {
1542        false
1543    };
1544
1545    if json {
1546        info!(
1547            "{}",
1548            serde_json::to_string(&serde_json::json!({
1549                "multiaddr": multiaddr,
1550                "added": added,
1551            }))?
1552        );
1553    } else if added {
1554        info!("Added peer: {}", multiaddr);
1555    } else {
1556        info!("Peer already exists: {}", multiaddr);
1557    }
1558
1559    Ok(())
1560}
1561
1562fn load_peers(shard_dir: &Path) -> Result<Vec<String>> {
1563    let peers_path = shard_dir.join("peers.json");
1564    if peers_path.exists() {
1565        let data = fs::read(peers_path)?;
1566        Ok(serde_json::from_slice(&data)?)
1567    } else {
1568        Ok(Vec::new())
1569    }
1570}
1571
1572fn authorized_keys_path(shard_dir: &Path) -> std::path::PathBuf {
1573    shard_dir.join("authorized_keys")
1574}
1575
1576fn load_authorized_keys(shard_dir: &Path) -> Result<Vec<ed25519_dalek::VerifyingKey>> {
1577    let path = authorized_keys_path(shard_dir);
1578    if !path.exists() {
1579        return Ok(Vec::new());
1580    }
1581    let content = fs::read_to_string(&path)?;
1582    let mut keys = Vec::new();
1583    for line in content.lines() {
1584        let line = line.trim();
1585        if line.is_empty() || line.starts_with('#') {
1586            continue;
1587        }
1588        let bytes = hex::decode(line)?;
1589        let arr: [u8; 32] = bytes
1590            .as_slice()
1591            .try_into()
1592            .map_err(|_| anyhow::anyhow!("Invalid public key length in authorized_keys"))?;
1593        keys.push(ed25519_dalek::VerifyingKey::from_bytes(&arr)?);
1594    }
1595    Ok(keys)
1596}
1597
1598pub fn add_authorized_key(shard_dir: &Path, public_key_hex: &str) -> Result<()> {
1599    // Validate the key
1600    let bytes = hex::decode(public_key_hex)?;
1601    let arr: [u8; 32] = bytes
1602        .as_slice()
1603        .try_into()
1604        .map_err(|_| anyhow::anyhow!("Public key must be 32 bytes (64 hex chars)"))?;
1605    let _pk = ed25519_dalek::VerifyingKey::from_bytes(&arr)?;
1606
1607    let path = authorized_keys_path(shard_dir);
1608    let mut content = if path.exists() {
1609        fs::read_to_string(&path)?
1610    } else {
1611        String::new()
1612    };
1613    // Check if key already exists
1614    if content.lines().any(|l| l.trim() == public_key_hex) {
1615        info!("Key already authorized");
1616        return Ok(());
1617    }
1618    content.push_str(public_key_hex);
1619    content.push('\n');
1620    fs::write(&path, content)?;
1621    info!("Authorized key added");
1622    Ok(())
1623}
1624
1625pub fn backup(path: &Path, output: &Path, json: bool) -> Result<()> {
1626    let shard_dir = path.join(".shard");
1627    if !shard_dir.exists() {
1628        anyhow::bail!("not a shard repository (run `shard init` first)");
1629    }
1630    let file = fs::File::create(output)?;
1631    let encoder = flate2::write::GzEncoder::new(file, flate2::Compression::default());
1632    let mut archive = tar::Builder::new(encoder);
1633    archive.append_dir_all(".shard", &shard_dir)?;
1634    archive.finish()?;
1635    if json {
1636        info!(
1637            "{}",
1638            serde_json::to_string(&serde_json::json!({
1639                "path": output.to_string_lossy(),
1640            }))?
1641        );
1642    } else {
1643        info!("Backup created: {}", output.display());
1644    }
1645    Ok(())
1646}
1647
1648pub fn export(path: &Path, commit_id: &str, output_dir: &Path, json: bool) -> Result<()> {
1649    let shard_dir = path.join(".shard");
1650    if !shard_dir.exists() {
1651        anyhow::bail!("not a shard repository (run `shard init` first)");
1652    }
1653    let store = Store::open(&shard_dir)?;
1654    let cipher = maybe_load_cipher(&shard_dir)?;
1655    let commit = load_commit(&store, commit_id)?;
1656    let mut files = Vec::new();
1657    for manifest_id in &commit.manifests {
1658        let data = store.get_chunk(manifest_id)?;
1659        let manifest: FileManifest = metadata::deserialize(&data)?;
1660        let compression = manifest.compression.parse::<Compression>()?;
1661        if !json {
1662            info!("Exporting file: {}", manifest.name);
1663        }
1664        let mut file_data = Vec::new();
1665        for chunk_id in &manifest.chunks {
1666            let chunk_data = store.get_chunk(chunk_id)?;
1667            let decrypted = match &cipher {
1668                Some(c) => c.decrypt(&chunk_data)?,
1669                None => chunk_data,
1670            };
1671            let decompressed = compression.decompress(&decrypted)?;
1672            file_data.extend_from_slice(&decompressed);
1673        }
1674        let out_path = output_dir.join(&manifest.name);
1675        if let Some(parent) = out_path.parent() {
1676            fs::create_dir_all(parent)?;
1677        }
1678        fs::write(&out_path, file_data)?;
1679        if !json {
1680            info!("  -> {}", out_path.display());
1681        }
1682        files.push(manifest.name);
1683    }
1684    if json {
1685        info!(
1686            "{}",
1687            serde_json::to_string(&serde_json::json!({
1688                "commit_id": commit_id,
1689                "files": files,
1690                "output_dir": output_dir.to_string_lossy(),
1691            }))?
1692        );
1693    } else {
1694        info!("Export complete.");
1695    }
1696    Ok(())
1697}
1698
1699pub fn import(
1700    path: &Path,
1701    source_dir: &Path,
1702    message: &str,
1703    author: &str,
1704    json: bool,
1705) -> Result<()> {
1706    let shard_dir = path.join(".shard");
1707    if !shard_dir.exists() {
1708        anyhow::bail!("not a shard repository (run `shard init` first)");
1709    }
1710    // Walk files in source_dir
1711    let config = load_config(&shard_dir)?;
1712    let compression: Compression = config
1713        .get("compression")
1714        .map(|s| s.as_str())
1715        .unwrap_or("zstd")
1716        .parse()?;
1717    let chunker_mode = chunker::ChunkerMode::from_config(&config);
1718    let fmt = MetadataFormat::from_config(&config);
1719    let store = Store::open(&shard_dir)?;
1720    let cipher = maybe_load_cipher(&shard_dir)?;
1721    let mut index = Index::load(&shard_dir.join("index"), &fmt)?;
1722    if !source_dir.is_dir() {
1723        anyhow::bail!("Source must be a directory");
1724    }
1725    for entry in walkdir::WalkDir::new(source_dir)
1726        .into_iter()
1727        .filter_entry(|e| {
1728            e.file_name()
1729                .to_str()
1730                .map(|s| !s.starts_with('.'))
1731                .unwrap_or(false)
1732        })
1733    {
1734        let entry = entry?;
1735        if entry.file_type().is_file() {
1736            add_file(
1737                path,
1738                entry.path(),
1739                &store,
1740                &mut index,
1741                &compression,
1742                &chunker_mode,
1743                cipher.as_ref(),
1744                json,
1745            )?;
1746        }
1747    }
1748    index.save(&shard_dir.join("index"), &fmt)?;
1749    // Auto-commit
1750    if !index.files.is_empty() {
1751        commit(path, message, author, json)?;
1752    } else if json {
1753        info!(
1754            "{}",
1755            serde_json::to_string(&serde_json::json!({"status": "no files found"}))?
1756        );
1757    } else {
1758        info!("No files found to import.");
1759    }
1760    Ok(())
1761}
1762
1763pub fn restore(path: &Path, backup_file: &Path, json: bool) -> Result<()> {
1764    let shard_dir = path.join(".shard");
1765    if shard_dir.exists() {
1766        anyhow::bail!(
1767            "Repository already exists — remove .shard first or use a different directory"
1768        );
1769    }
1770    let file = fs::File::open(backup_file)?;
1771    let decoder = flate2::read::GzDecoder::new(file);
1772    let mut archive = tar::Archive::new(decoder);
1773    archive.unpack(path)?;
1774    // Verify the result
1775    if !path.join(".shard").exists() {
1776        anyhow::bail!("Backup does not contain a valid .shard directory");
1777    }
1778    if json {
1779        info!(
1780            "{}",
1781            serde_json::to_string(&serde_json::json!({
1782                "backup": backup_file.to_string_lossy(),
1783            }))?
1784        );
1785    } else {
1786        info!("Restored from {}", backup_file.display());
1787    }
1788    Ok(())
1789}
1790
1791struct RepoProvider {
1792    store: Store,
1793    shard_dir: std::path::PathBuf,
1794}
1795
1796impl shard_net::p2p::ShardContentProvider for RepoProvider {
1797    fn get_manifest(&self, id: &str) -> Option<Vec<u8>> {
1798        self.store.get_chunk(id).ok()
1799    }
1800    fn get_chunk(&self, id: &str) -> Option<Vec<u8>> {
1801        self.store.get_chunk(id).ok()
1802    }
1803    fn put_chunk(&mut self, id: &str, data: &[u8]) -> bool {
1804        let hash = blake3::hash(data);
1805        let hex = hash.to_hex().to_string();
1806        if hex != id {
1807            return false;
1808        }
1809        self.store
1810            .put_chunk(&crate::chunker::Chunk {
1811                hash,
1812                data: data.to_vec(),
1813                offset: 0,
1814            })
1815            .is_ok()
1816    }
1817    fn verify_auth(&self, public_key: &[u8], nonce: &[u8], signature: &[u8]) -> bool {
1818        use ed25519_dalek::Verifier;
1819        let pk_bytes: [u8; 32] = match public_key.try_into() {
1820            Ok(b) => b,
1821            Err(_) => return false,
1822        };
1823        let pk = match ed25519_dalek::VerifyingKey::from_bytes(&pk_bytes) {
1824            Ok(k) => k,
1825            Err(_) => return false,
1826        };
1827        let sig_bytes: [u8; 64] = match signature.try_into() {
1828            Ok(b) => b,
1829            Err(_) => return false,
1830        };
1831        let sig = ed25519_dalek::Signature::from_bytes(&sig_bytes);
1832        if pk.verify(nonce, &sig).is_err() {
1833            return false;
1834        }
1835        // Check authorized_keys if the file exists
1836        if let Ok(keys) = load_authorized_keys(&self.shard_dir) {
1837            if !keys.is_empty() {
1838                return keys.contains(&pk);
1839            }
1840        }
1841        true
1842    }
1843    fn repo_public_key(&self) -> Option<Vec<u8>> {
1844        let keys = load_keypair(&self.shard_dir).ok()?;
1845        Some(keys.verifying_key.to_bytes().to_vec())
1846    }
1847}
1848
1849/// Compute file count and total size for a commit's manifests.
1850fn commit_stats(shard_dir: &Path, commit_id: &str) -> Result<(u64, u64)> {
1851    let store = Store::open(shard_dir)?;
1852    let commit = load_commit(&store, commit_id)?;
1853    let mut file_count = 0u64;
1854    let mut total_size = 0u64;
1855    for mid in &commit.manifests {
1856        if let Ok(data) = store.get_chunk(mid) {
1857            if let Ok(m) = metadata::deserialize::<FileManifest>(&data) {
1858                file_count += 1;
1859                total_size += m.size;
1860            }
1861        }
1862    }
1863    Ok((file_count, total_size))
1864}
1865
1866pub async fn share(path: &Path, json: bool) -> Result<()> {
1867    let shard_dir = path.join(".shard");
1868    if !shard_dir.exists() {
1869        anyhow::bail!("not a shard repository (run `shard init` first)");
1870    }
1871
1872    let mut node = shard_net::p2p::Node::new().await?;
1873
1874    // Subscribe to global announcement topic
1875    let ann_topic = shard_net::libp2p::gossipsub::IdentTopic::new("shard:ann");
1876    node.subscribe(&ann_topic)?;
1877
1878    // Bootstrap from peers
1879    let peers = load_peers(&shard_dir)?;
1880    for peer in peers {
1881        if let Ok(addr) = peer.parse::<shard_net::libp2p::Multiaddr>() {
1882            let _ = node.swarm.dial(addr);
1883        }
1884    }
1885
1886    // Trigger Kademlia DHT bootstrap
1887    node.swarm.behaviour_mut().kademlia.bootstrap().ok();
1888
1889    node.listen("/ip4/0.0.0.0/tcp/0").await?;
1890    let listen_addrs: Vec<String> = node.swarm.listeners().map(|a| a.to_string()).collect();
1891    let peer_multiaddr = listen_addrs.first().cloned().unwrap_or_default();
1892
1893    let config = load_config(&shard_dir)?;
1894    let repo_name = config.get("repo_name").cloned().unwrap_or_default();
1895
1896    let store = Store::open(&shard_dir)?;
1897    let provider = RepoProvider {
1898        store,
1899        shard_dir: shard_dir.clone(),
1900    };
1901
1902    // Publish announcement for HEAD commit
1903    if let Some(head) = branch::resolve_head(&shard_dir)?.1 {
1904        let (file_count, total_size) = commit_stats(&shard_dir, &head)?;
1905        let ann = shard_net::protocol::Announcement {
1906            commit_id: head,
1907            file_count,
1908            total_size,
1909            repo_name: repo_name.clone(),
1910            peer_multiaddr: format!("{}/p2p/{}", peer_multiaddr, node.local_peer_id()),
1911        };
1912        let payload = serde_json::to_vec(&ann)?;
1913        let _ = node.publish(&ann_topic, payload);
1914    }
1915
1916    if json {
1917        info!(
1918            "{}",
1919            serde_json::to_string(&serde_json::json!({
1920                "status": "sharing",
1921                "peer_id": node.local_peer_id().to_string(),
1922            }))?
1923        );
1924    } else {
1925        info!("Sharing repository...");
1926    }
1927    node.run(provider).await;
1928
1929    Ok(())
1930}
1931
1932/// Start a circuit relay v2 server for NAT traversal.
1933/// Listens on the given address and forwards traffic between peers.
1934pub async fn relay(listen_addr: &str, json: bool) -> Result<()> {
1935    let mut node = shard_net::p2p::Node::new().await?;
1936    node.listen(listen_addr).await?;
1937    if json {
1938        info!(
1939            "{}",
1940            serde_json::to_string(&serde_json::json!({
1941                "status": "relay active",
1942                "listen": listen_addr,
1943                "peer_id": node.local_peer_id().to_string(),
1944            }))?
1945        );
1946    } else {
1947        info!("Relay server active on {}", listen_addr);
1948        info!("Peer ID: {}", node.local_peer_id());
1949        info!("Ready to accept circuit relay v2 reservations");
1950    }
1951    node.run(EmptyProvider).await;
1952    Ok(())
1953}
1954
1955/// Minimal provider for relay-only mode (no repo content needed).
1956struct EmptyProvider;
1957impl shard_net::p2p::ShardContentProvider for EmptyProvider {
1958    fn get_manifest(&self, _id: &str) -> Option<Vec<u8>> {
1959        None
1960    }
1961    fn get_chunk(&self, _id: &str) -> Option<Vec<u8>> {
1962        None
1963    }
1964    fn put_chunk(&mut self, _id: &str, _data: &[u8]) -> bool {
1965        false
1966    }
1967    fn verify_auth(&self, _public_key: &[u8], _nonce: &[u8], _signature: &[u8]) -> bool {
1968        false
1969    }
1970    fn repo_public_key(&self) -> Option<Vec<u8>> {
1971        None
1972    }
1973}
1974
1975pub async fn sync(path: &Path, _json: bool) -> Result<()> {
1976    let shard_dir = path.join(".shard");
1977    if !shard_dir.exists() {
1978        anyhow::bail!("not a shard repository (run `shard init` first)");
1979    }
1980
1981    let config = load_config(&shard_dir)?;
1982    let repo_id = config.get("repo_id").cloned().unwrap_or_default();
1983    let repo_name = config.get("repo_name").cloned().unwrap_or_default();
1984    let ann_topic = shard_net::libp2p::gossipsub::IdentTopic::new("shard:ann");
1985    let repo_topic =
1986        shard_net::libp2p::gossipsub::IdentTopic::new(format!("/shard/repo/{}", repo_id));
1987
1988    let mut node = shard_net::p2p::Node::new().await?;
1989    node.subscribe(&ann_topic)?;
1990    node.subscribe(&repo_topic)?;
1991    node.listen("/ip4/0.0.0.0/tcp/0").await?;
1992    let listen_addrs: Vec<String> = node.swarm.listeners().map(|a| a.to_string()).collect();
1993    let peer_multiaddr = listen_addrs.first().cloned().unwrap_or_default();
1994
1995    // Bootstrap from configured peers
1996    let peers = load_peers(&shard_dir)?;
1997    for peer in peers {
1998        if let Ok(addr) = peer.parse::<shard_net::libp2p::Multiaddr>() {
1999            let _ = node.swarm.dial(addr);
2000        }
2001    }
2002
2003    // Trigger Kademlia DHT bootstrap
2004    node.swarm.behaviour_mut().kademlia.bootstrap().ok();
2005
2006    // Helper to publish announcement to both global and repo-specific topics
2007    let publish_ann = |node: &mut shard_net::p2p::Node, ann: &shard_net::protocol::Announcement| {
2008        if let Ok(payload) = serde_json::to_vec(ann) {
2009            let _ = node.publish(&ann_topic, payload.clone());
2010            let _ = node.publish(&repo_topic, payload);
2011        }
2012    };
2013
2014    let head_commit = branch::resolve_head(&shard_dir)?.1;
2015
2016    // Initial announce (may fail with InsufficientPeers if no peers yet)
2017    if let Some(ref head) = head_commit {
2018        let (file_count, total_size) = commit_stats(&shard_dir, head)?;
2019        let ann = shard_net::protocol::Announcement {
2020            commit_id: head.clone(),
2021            file_count,
2022            total_size,
2023            repo_name: repo_name.clone(),
2024            peer_multiaddr: format!("{}/p2p/{}", peer_multiaddr, node.local_peer_id()),
2025        };
2026        publish_ann(&mut node, &ann);
2027        if !_json {
2028            info!("Announced commit {} on sync topic", head)
2029        }
2030    } else if !_json {
2031        info!("No commits to announce");
2032    }
2033
2034    if !_json {
2035        info!("Syncing on topic with peer id: {}", node.local_peer_id());
2036    }
2037    let _ = std::io::stdout().flush();
2038
2039    let store = Store::open(&shard_dir)?;
2040    let mut provider = RepoProvider {
2041        store,
2042        shard_dir: shard_dir.clone(),
2043    };
2044
2045    let mut interval = tokio::time::interval(Duration::from_secs(5));
2046    let mut address_book: HashMap<shard_net::libp2p::PeerId, Vec<shard_net::libp2p::Multiaddr>> =
2047        HashMap::new();
2048    let mut announce_counts: HashMap<(shard_net::libp2p::PeerId, String), u32> = HashMap::new();
2049    let mut request_counts: HashMap<shard_net::libp2p::PeerId, u32> = HashMap::new();
2050    let path_buf = path.to_path_buf();
2051
2052    loop {
2053        tokio::select! {
2054            _ = tokio::signal::ctrl_c() => {
2055                info!("\nSync shutting down...");
2056                break Ok(());
2057            }
2058            _ = interval.tick() => {
2059                // Periodic rate-limit counter reset (rolling 60s window)
2060                request_counts.clear();
2061                announce_counts.clear();
2062                if let Some(ref head) = branch::resolve_head(&shard_dir)?.1 {
2063                    let (fc, ts) = commit_stats(&shard_dir, head).unwrap_or_default();
2064                    let ann = shard_net::protocol::Announcement {
2065                        commit_id: head.clone(),
2066                        file_count: fc,
2067                        total_size: ts,
2068                        repo_name: repo_name.clone(),
2069                        peer_multiaddr: format!("{}/p2p/{}", peer_multiaddr, node.local_peer_id()),
2070                    };
2071                    publish_ann(&mut node, &ann);
2072                    info!("Re-announced commit {} on sync topic", head);
2073                }
2074            }
2075            event = node.swarm.select_next_some() => {
2076                match event {
2077                    shard_net::libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
2078                        info!("Listening on {address:?}");
2079                        let _ = std::io::stdout().flush();
2080                    }
2081                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2082                        shard_net::p2p::ShardBehaviourEvent::Mdns(
2083                            shard_net::libp2p::mdns::Event::Discovered(list),
2084                        ),
2085                    ) => {
2086                        for (peer_id, multiaddr) in list {
2087                            info!("mDNS discovered: {peer_id} {multiaddr}");
2088                            address_book.entry(peer_id).or_default().push(multiaddr.clone());
2089                            node.swarm
2090                                .behaviour_mut()
2091                                .gossipsub
2092                                .add_explicit_peer(&peer_id);
2093                            node.swarm
2094                                .behaviour_mut()
2095                                .kademlia
2096                                .add_address(&peer_id, multiaddr);
2097                        }
2098                    }
2099                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2100                        shard_net::p2p::ShardBehaviourEvent::Mdns(shard_net::libp2p::mdns::Event::Expired(
2101                            list,
2102                        )),
2103                    ) => {
2104                        for (peer_id, _multiaddr) in list {
2105                            info!("mDNS expired: {peer_id}");
2106                            node.swarm
2107                                .behaviour_mut()
2108                                .gossipsub
2109                                .remove_explicit_peer(&peer_id);
2110                        }
2111                    }
2112                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2113                        shard_net::p2p::ShardBehaviourEvent::Gossipsub(
2114                            shard_net::libp2p::gossipsub::Event::Message {
2115                                propagation_source,
2116                                message,
2117                                ..
2118                            },
2119                        ),
2120                    ) => {
2121                        // Parse structured Announcement JSON
2122                        if let Ok(ann) = serde_json::from_slice::<shard_net::protocol::Announcement>(&message.data) {
2123                            let peer = propagation_source;
2124                            let commit_id_owned = ann.commit_id;
2125                            info!(
2126                                "Peer {} announced commit: {} (repo: {}, {} files, {} bytes)",
2127                                peer, commit_id_owned, ann.repo_name, ann.file_count, ann.total_size,
2128                            );
2129                            // Store announcer's multiaddr if we don't have it yet
2130                            if !ann.peer_multiaddr.is_empty() && !address_book.contains_key(&peer) {
2131                                if let Ok(addr) = ann.peer_multiaddr.parse::<shard_net::libp2p::Multiaddr>() {
2132                                    address_book.entry(peer).or_default().push(addr);
2133                                }
2134                            }
2135                            // Rate limiting: track announcement count per peer
2136                            let rate_key = (peer, commit_id_owned.clone());
2137                            if announce_counts.get(&rate_key).copied().unwrap_or(0) > 5 {
2138                                warn!("Rate limit exceeded for peer {} commit {}", peer, commit_id_owned);
2139                            } else {
2140                                *announce_counts.entry(rate_key).or_insert(0) += 1;
2141                                // Reply with our HEAD if different
2142                                let our_head = branch::resolve_head(&shard_dir)?.1.unwrap_or_default();
2143                                if our_head != commit_id_owned {
2144                                    let (fc, ts) = commit_stats(&shard_dir, &our_head).unwrap_or_default();
2145                                    let reply = shard_net::protocol::Announcement {
2146                                        commit_id: our_head,
2147                                        file_count: fc,
2148                                        total_size: ts,
2149                                        repo_name: repo_name.clone(),
2150                                        peer_multiaddr: format!("{}/p2p/{}", peer_multiaddr, node.local_peer_id()),
2151                                    };
2152                                    publish_ann(&mut node, &reply);
2153                                }
2154                                if let Some(addrs) = address_book.get(&peer) {
2155                                    if let Some(addr) = addrs.first() {
2156                                        let multiaddr_str = format!("{}/p2p/{}", addr, peer);
2157                                        let path_clone = path_buf.clone();
2158                                        tokio::spawn(async move {
2159                                            match pull(&path_clone, &multiaddr_str, &commit_id_owned, false).await {
2160                                                Ok(_) => info!("Auto-pulled commit {} from {}", commit_id_owned, peer),
2161                                                Err(e) => error!("Auto-pull failed for commit {} from {}: {}", commit_id_owned, peer, e),
2162                                            }
2163                                        });
2164                                    }
2165                                }
2166                            }
2167                        }
2168                    }
2169                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2170                        shard_net::p2p::ShardBehaviourEvent::RequestResponse(
2171                            shard_net::libp2p::request_response::Event::Message { peer, message },
2172                        ),
2173                    ) => {
2174                        if let shard_net::libp2p::request_response::Message::Request {
2175                            request, channel, ..
2176                        } = message
2177                        {
2178                            // Per-peer request rate limiting: max 50 requests in any 60s window
2179                            let req_count = request_counts.entry(peer).or_insert(0u32);
2180                            *req_count += 1;
2181                            if *req_count > 50 {
2182                                warn!("Dropping request from {}: rate limit exceeded", peer);
2183                                // Reset counter periodically (cooldown via interval tick)
2184                            } else {
2185                                info!("Received request from {}", peer);
2186                                node.serve_request(&peer, &mut provider, request, channel);
2187                            }
2188                        } else {
2189                            info!("Received Response from {}", peer);
2190                        }
2191                    }
2192                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2193                        shard_net::p2p::ShardBehaviourEvent::RequestResponse(
2194                            shard_net::libp2p::request_response::Event::OutboundFailure {
2195                                peer, error, ..
2196                            },
2197                        ),
2198                    ) => {
2199                        error!("Outbound failure to {}: {:?}", peer, error);
2200                    }
2201                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2202                        shard_net::p2p::ShardBehaviourEvent::RequestResponse(
2203                            shard_net::libp2p::request_response::Event::InboundFailure {
2204                                peer, error, ..
2205                            },
2206                        ),
2207                    ) => {
2208                        error!("Inbound failure from {}: {:?}", peer, error);
2209                    }
2210                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2211                        shard_net::p2p::ShardBehaviourEvent::Identify(
2212                            shard_net::libp2p::identify::Event::Received { peer_id, info },
2213                        ),
2214                    ) => {
2215                        info!("Identify received from {}: {:?}", peer_id, info.listen_addrs);
2216                        for addr in info.listen_addrs {
2217                            address_book.entry(peer_id).or_default().push(addr);
2218                        }
2219                        let _ = std::io::stdout().flush();
2220                    }
2221                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2222                        shard_net::p2p::ShardBehaviourEvent::Identify(event),
2223                    ) => {
2224                        info!("Identify event: {:?}", event);
2225                    }
2226                    shard_net::libp2p::swarm::SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } => {
2227                        info!("Connection established with {}", peer_id);
2228                        // Only store the address when we dialed (it's the peer's listen addr).
2229                        // For listener connections, send_back_addr is the ephemeral port — useless for dialing back.
2230                        if let shard_net::libp2p::core::ConnectedPoint::Dialer { address, .. } = &endpoint {
2231                            address_book.entry(peer_id).or_default().push(address.clone());
2232                        }
2233                        node.swarm
2234                            .behaviour_mut()
2235                            .gossipsub
2236                            .add_explicit_peer(&peer_id);
2237                        // Announce HEAD to the newly connected peer
2238                        if let Some(ref head) = branch::resolve_head(&shard_dir)?.1 {
2239                            let (fc, ts) = commit_stats(&shard_dir, head).unwrap_or_default();
2240                            let ann = shard_net::protocol::Announcement {
2241                                commit_id: head.clone(),
2242                                file_count: fc,
2243                                total_size: ts,
2244                                repo_name: repo_name.clone(),
2245                                peer_multiaddr: format!("{}/p2p/{}", peer_multiaddr, node.local_peer_id()),
2246                            };
2247                            publish_ann(&mut node, &ann);
2248                        }
2249                    }
2250                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2251                        shard_net::p2p::ShardBehaviourEvent::Relay(event),
2252                    ) => {
2253                        info!("Relay event: {:?}", event);
2254                    }
2255                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2256                        shard_net::p2p::ShardBehaviourEvent::Dcutr(event),
2257                    ) => {
2258                        info!("DCUtR event: {:?}", event);
2259                    }
2260                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
2261                        shard_net::p2p::ShardBehaviourEvent::Autonat(event),
2262                    ) => {
2263                        info!("AutoNAT event: {:?}", event);
2264                    }
2265                    shard_net::libp2p::swarm::SwarmEvent::IncomingConnection {
2266                        local_addr,
2267                        send_back_addr,
2268                        ..
2269                    } => {
2270                        info!(
2271                            "Incoming connection from {} to {}",
2272                            send_back_addr, local_addr
2273                        );
2274                    }
2275                    e => {
2276                        info!("Event: {:?}", e);
2277                    }
2278                }
2279            }
2280        }
2281    }
2282}
2283
2284pub async fn pull(path: &Path, peer: &str, commit_id: &str, json: bool) -> Result<()> {
2285    let shard_dir = path.join(".shard");
2286    // pull can work on empty repo or existing one.
2287
2288    if !shard_dir.exists() {
2289        init(path, "flat", "zstd", "fixed", None, false, false)?;
2290    }
2291
2292    let store = Store::open(&shard_dir)?;
2293    let cipher = maybe_load_cipher(&shard_dir)?;
2294
2295    let mut node = shard_net::p2p::Node::new().await?;
2296
2297    // Parse peer multiaddr
2298    let multiaddr: shard_net::libp2p::Multiaddr = peer.parse()?;
2299    let peer_id = match multiaddr.iter().last() {
2300        Some(shard_net::libp2p::multiaddr::Protocol::P2p(peer_id)) => peer_id,
2301        _ => anyhow::bail!("Multiaddr must end with /p2p/<peer_id>"),
2302    };
2303
2304    // 1. Get Commit (sequential — single request)
2305    if !json {
2306        info!("Pulling commit {} from {}...", commit_id, peer);
2307    }
2308    let commit_data = node
2309        .request_manifest(&multiaddr, peer_id, commit_id.to_string())
2310        .await?;
2311    let hash = blake3::hash(&commit_data);
2312    if hash.to_hex().to_string() != commit_id {
2313        anyhow::bail!("Commit hash mismatch");
2314    }
2315    let chunk = crate::chunker::Chunk {
2316        hash,
2317        data: commit_data.clone(),
2318        offset: 0,
2319    };
2320    store.put_chunk(&chunk)?;
2321
2322    let commit: Commit = metadata::deserialize(&commit_data)?;
2323    if !json {
2324        info!("Got commit: {}", commit.message);
2325    }
2326
2327    // Fetch key rotation records for this commit's key chain
2328    let keys_dir = shard_dir.join("keys");
2329    if let Some(kid) = &commit.key_id {
2330        if keys_dir.join("records").exists() {
2331            if let Ok(chain) = keychain::collect_rotation_chain(&keys_dir, kid) {
2332                let missing_rotations: Vec<&KeyRotation> = chain
2333                    .iter()
2334                    .filter(|r| store.get_chunk(&r.rotation_id).is_err())
2335                    .collect();
2336                if !missing_rotations.is_empty() {
2337                    if !json {
2338                        info!(
2339                            "Fetching {} key rotation records from peer...",
2340                            missing_rotations.len()
2341                        );
2342                    }
2343                    let rot_requests: Vec<(String, shard_net::protocol::ShardRequest)> =
2344                        missing_rotations
2345                            .iter()
2346                            .map(|r| {
2347                                (
2348                                    r.rotation_id.clone(),
2349                                    shard_net::protocol::ShardRequest::GetChunk(
2350                                        r.rotation_id.clone(),
2351                                    ),
2352                                )
2353                            })
2354                            .collect();
2355                    if let Ok(rot_results) = node
2356                        .request_parallel(&multiaddr, peer_id, rot_requests)
2357                        .await
2358                    {
2359                        for (rot_id, rot_data) in &rot_results {
2360                            let rh = blake3::hash(rot_data);
2361                            if rh.to_hex().to_string() != *rot_id {
2362                                info!("Key rotation record hash mismatch (expected {}, got {}) — skipping", rot_id, rh.to_hex());
2363                                continue;
2364                            }
2365                            store.put_chunk(&crate::chunker::Chunk {
2366                                hash: rh,
2367                                data: rot_data.clone(),
2368                                offset: 0,
2369                            })?;
2370                        }
2371                        if !json {
2372                            info!("Key rotation records synced from peer.");
2373                        }
2374                    }
2375                }
2376            }
2377        }
2378    }
2379
2380    // Set repo_id from commit's public key so clones share the gossipsub topic
2381    if let Some(pk_hex) = &commit.public_key {
2382        let pk_bytes = hex::decode(pk_hex)?;
2383        let repo_id = blake3::hash(&pk_bytes).to_hex().to_string();
2384        let mut config = load_config(&shard_dir)?;
2385        config.insert("repo_id".to_string(), repo_id);
2386        save_config(&shard_dir, &config)?;
2387    }
2388
2389    // 2. Fetch all manifests in parallel
2390    let manifest_requests: Vec<(String, shard_net::protocol::ShardRequest)> = commit
2391        .manifests
2392        .iter()
2393        .map(|id| {
2394            (
2395                id.clone(),
2396                shard_net::protocol::ShardRequest::GetManifest(id.clone()),
2397            )
2398        })
2399        .collect();
2400    let manifest_results = node
2401        .request_parallel(&multiaddr, peer_id, manifest_requests)
2402        .await?;
2403
2404    let mut all_chunk_ids: Vec<String> = Vec::new();
2405    let mut file_manifests: Vec<FileManifest> = Vec::new();
2406    // Map chunk_id -> compression type for verification in step 3
2407    let mut chunk_compression: HashMap<String, String> = HashMap::new();
2408
2409    for (manifest_id, manifest_data) in &manifest_results {
2410        let hash = blake3::hash(manifest_data);
2411        if hash.to_hex().to_string() != *manifest_id {
2412            anyhow::bail!("Manifest hash mismatch: {}", manifest_id);
2413        }
2414        let chunk = crate::chunker::Chunk {
2415            hash,
2416            data: manifest_data.clone(),
2417            offset: 0,
2418        };
2419        store.put_chunk(&chunk)?;
2420        let manifest: FileManifest = metadata::deserialize(manifest_data)?;
2421        if !json {
2422            info!(
2423                "Fetching file: {} (compression: {})",
2424                manifest.name, manifest.compression
2425            );
2426        }
2427        for cid in &manifest.chunks {
2428            chunk_compression.insert(cid.clone(), manifest.compression.clone());
2429        }
2430        all_chunk_ids.extend(manifest.chunks.clone());
2431        file_manifests.push(manifest);
2432    }
2433
2434    // 3. Resume: recover from partial directory if previous transfer was interrupted
2435    let partial = partial::PartialTransfer::new(&shard_dir, commit_id)?;
2436    let partial_chunks = partial.list_chunks()?;
2437    let mut recovered = 0usize;
2438    for chunk_id in &partial_chunks {
2439        if let Ok(data) = partial.load_chunk(chunk_id) {
2440            let hash = blake3::hash(&data);
2441            if hash.to_hex().to_string() == *chunk_id {
2442                // Recovered chunk matches — save to store
2443                let chunk = crate::chunker::Chunk {
2444                    hash,
2445                    data: data.clone(),
2446                    offset: 0,
2447                };
2448                if store.put_chunk(&chunk).is_ok() {
2449                    recovered += 1;
2450                }
2451            } else {
2452                // Corrupted partial chunk — remove and re-fetch
2453                let _ = partial.remove_chunk(chunk_id);
2454            }
2455        }
2456    }
2457    if recovered > 0 {
2458        info!("Recovered {} chunks from partial transfer", recovered);
2459    }
2460
2461    // 4. Fetch all missing chunks (not in store and not in partial)
2462    let needed_chunks: Vec<String> = all_chunk_ids
2463        .into_iter()
2464        .filter(|id| store.get_chunk(id).is_err())
2465        .collect();
2466
2467    if !needed_chunks.is_empty() {
2468        if !json {
2469            info!("Fetching {} chunks...", needed_chunks.len());
2470        }
2471        let chunk_requests: Vec<(String, shard_net::protocol::ShardRequest)> = needed_chunks
2472            .iter()
2473            .map(|id| {
2474                (
2475                    id.clone(),
2476                    shard_net::protocol::ShardRequest::GetChunk(id.clone()),
2477                )
2478            })
2479            .collect();
2480        let chunk_results = node
2481            .request_parallel(&multiaddr, peer_id, chunk_requests)
2482            .await?;
2483        for (chunk_id, chunk_data) in &chunk_results {
2484            // Determine compression from the manifest this chunk belongs to
2485            let compression: Compression = chunk_compression
2486                .get(chunk_id)
2487                .map(|s| s.as_str())
2488                .unwrap_or("none")
2489                .parse()?;
2490            // Decrypt (if private) then decompress to verify the content hash
2491            let decrypted = match &cipher {
2492                Some(c) => c.decrypt(chunk_data)?,
2493                None => chunk_data.clone(),
2494            };
2495            let decompressed = compression.decompress(&decrypted)?;
2496            let hash = blake3::hash(&decompressed);
2497            if hash.to_hex().to_string() != *chunk_id {
2498                anyhow::bail!("Chunk hash mismatch: {}", chunk_id);
2499            }
2500            // Store the data as received (encrypted for private repos)
2501            let chunk = crate::chunker::Chunk {
2502                hash,
2503                data: chunk_data.clone(),
2504                offset: 0,
2505            };
2506            store.put_chunk(&chunk)?;
2507            // Save to partial for resume support
2508            partial.save_chunk(chunk_id, chunk_data)?;
2509        }
2510    }
2511
2512    // 5. Reconstruct all files
2513    for manifest in &file_manifests {
2514        let compression = manifest.compression.parse::<Compression>()?;
2515        let mut file_data = Vec::new();
2516        for chunk_id in &manifest.chunks {
2517            let stored = store.get_chunk(chunk_id)?;
2518            let decrypted = match &cipher {
2519                Some(c) => c.decrypt(&stored)?,
2520                None => stored,
2521            };
2522            let decompressed = compression.decompress(&decrypted)?;
2523            file_data.extend_from_slice(&decompressed);
2524        }
2525        fs::write(path.join(&manifest.name), file_data)?;
2526        if !json {
2527            info!(
2528                "Reconstructed file: {} ({} bytes)",
2529                manifest.name, manifest.size
2530            );
2531        }
2532    }
2533
2534    // 6. Clean up partial transfer tracking
2535    partial.cleanup()?;
2536
2537    if json {
2538        info!(
2539            "{}",
2540            serde_json::to_string(&serde_json::json!({
2541                "status": "pull complete",
2542                "commit_id": commit_id,
2543            }))?
2544        );
2545    } else {
2546        info!("Pull complete.");
2547    }
2548    Ok(())
2549}
2550
2551pub fn transfer_list(path: &Path, json: bool) -> Result<()> {
2552    let shard_dir = path.join(".shard");
2553    if !shard_dir.exists() {
2554        anyhow::bail!("not a shard repository (run `shard init` first)");
2555    }
2556    let transfers = partial::list_incomplete_transfers(&shard_dir)?;
2557    if json {
2558        info!("{}", serde_json::to_string(&transfers)?);
2559    } else {
2560        if transfers.is_empty() {
2561            info!("No incomplete transfers.");
2562        } else {
2563            for t in &transfers {
2564                info!("Incomplete transfer: {}", t);
2565            }
2566        }
2567    }
2568    Ok(())
2569}
2570
2571pub fn transfer_remove(path: &Path, commit_id: &str) -> Result<()> {
2572    let shard_dir = path.join(".shard");
2573    if !shard_dir.exists() {
2574        anyhow::bail!("not a shard repository (run `shard init` first)");
2575    }
2576    partial::remove_transfer(&shard_dir, commit_id)?;
2577    info!("Removed transfer tracking for {}", commit_id);
2578    Ok(())
2579}
2580
2581pub async fn push(path: &Path, peer: &str, json: bool) -> Result<()> {
2582    let shard_dir = path.join(".shard");
2583    if !shard_dir.exists() {
2584        anyhow::bail!("not a shard repository (run `shard init` first)");
2585    }
2586
2587    let (_, head_id) = branch::resolve_head(&shard_dir)?;
2588    let head_id = head_id.ok_or_else(|| anyhow::anyhow!("No commits to push"))?;
2589
2590    let store = Store::open(&shard_dir)?;
2591
2592    // Collect all reachable objects
2593    let mut objects: std::collections::BTreeMap<String, Vec<u8>> =
2594        std::collections::BTreeMap::new();
2595
2596    // Walk commits
2597    let mut seen = std::collections::HashSet::new();
2598    let mut stack = vec![head_id.clone()];
2599    while let Some(cid) = stack.pop() {
2600        if !seen.insert(cid.clone()) {
2601            continue;
2602        }
2603        if let Ok(data) = store.get_chunk(&cid) {
2604            objects.insert(cid, data.clone());
2605            if let Ok(commit) = metadata::deserialize::<Commit>(&data) {
2606                // Include key rotation records for this commit's key chain
2607                if let Some(kid) = &commit.key_id {
2608                    let keys_dir = shard_dir.join("keys");
2609                    if let Ok(chain) = keychain::collect_rotation_chain(&keys_dir, kid) {
2610                        for rot in &chain {
2611                            let rj = serde_json::to_vec(rot)?;
2612                            let rh = blake3::hash(&rj);
2613                            if !store.has_chunk(rh.to_hex().as_ref()) {
2614                                store.put_chunk(&crate::chunker::Chunk {
2615                                    hash: rh,
2616                                    data: rj.clone(),
2617                                    offset: 0,
2618                                })?;
2619                            }
2620                            objects.insert(rot.rotation_id.clone(), rj);
2621                        }
2622                    }
2623                }
2624                for mid in &commit.manifests {
2625                    if let Ok(manifest_data) = store.get_chunk(mid) {
2626                        objects.insert(mid.clone(), manifest_data.clone());
2627                        if let Ok(manifest) = metadata::deserialize::<FileManifest>(&manifest_data)
2628                        {
2629                            for cid in &manifest.chunks {
2630                                if let Ok(chunk_data) = store.get_chunk(cid) {
2631                                    objects.insert(cid.clone(), chunk_data);
2632                                }
2633                            }
2634                        }
2635                    }
2636                }
2637                for parent in &commit.parents {
2638                    stack.push(parent.clone());
2639                }
2640            }
2641        }
2642    }
2643
2644    if !json {
2645        info!(
2646            "Pushing {} objects ({} bytes)...",
2647            objects.len(),
2648            objects.values().map(|v| v.len() as u64).sum::<u64>()
2649        );
2650    }
2651
2652    // Connect and send all objects
2653    let mut node = shard_net::p2p::Node::new().await?;
2654    let multiaddr: shard_net::libp2p::Multiaddr = peer.parse()?;
2655    let peer_id = match multiaddr.iter().last() {
2656        Some(shard_net::libp2p::multiaddr::Protocol::P2p(peer_id)) => peer_id,
2657        _ => anyhow::bail!("Multiaddr must end with /p2p/<peer_id>"),
2658    };
2659
2660    for (id, data) in &objects {
2661        node.request_put_chunk(&multiaddr, peer_id, id.clone(), data.clone())
2662            .await?;
2663    }
2664
2665    if json {
2666        info!(
2667            "{}",
2668            serde_json::to_string(&serde_json::json!({
2669                "status": "push complete",
2670                "objects": objects.len(),
2671            }))?
2672        );
2673    } else {
2674        info!("Push complete ({} objects).", objects.len());
2675    }
2676    Ok(())
2677}
2678
2679/// Rotate the signing key: generates a new ed25519 keypair, archives the old
2680/// one, and persists a signed rotation record.
2681pub fn key_rotate(path: &Path) -> Result<()> {
2682    let shard_dir = path.join(".shard");
2683    if !shard_dir.exists() {
2684        anyhow::bail!("not a shard repository (run `shard init` first)");
2685    }
2686    let keys_dir = shard_dir.join("keys");
2687    let rotation = keychain::rotate_signing_key(&keys_dir)?;
2688
2689    // Also update global keyring
2690    if let Some(global) = global_keys_dir() {
2691        fs::create_dir_all(&global).ok();
2692        if let Ok(new_keys) = KeyPair::load(&keys_dir) {
2693            let _ = new_keys.save(&global);
2694        }
2695    }
2696
2697    // Store rotation record as a content-addressed chunk in the DAG
2698    let store = Store::open(&shard_dir)?;
2699    let json = serde_json::to_vec(&rotation)?;
2700    let hash = blake3::hash(&json);
2701    if !store.has_chunk(hash.to_hex().as_ref()) {
2702        store.put_chunk(&crate::chunker::Chunk {
2703            hash,
2704            data: json,
2705            offset: 0,
2706        })?;
2707    }
2708
2709    // Replicate all rotation records as chunks for P2P availability
2710    let rotations = keychain::load_rotations(&keys_dir)?;
2711    for rot in &rotations {
2712        let rj = serde_json::to_vec(rot)?;
2713        let rh = blake3::hash(&rj);
2714        if !store.has_chunk(rh.to_hex().as_ref()) {
2715            store.put_chunk(&crate::chunker::Chunk {
2716                hash: rh,
2717                data: rj,
2718                offset: 0,
2719            })?;
2720        }
2721    }
2722
2723    info!(
2724        "Key rotated: {} -> {}",
2725        rotation.old_key_id, rotation.new_key_id
2726    );
2727    info!("Rotation record: {} (stored in DAG)", rotation.rotation_id);
2728    Ok(())
2729}
2730
2731/// List all keys in the keychain with their validity info.
2732pub fn key_list(path: &Path, json: bool) -> Result<()> {
2733    let shard_dir = path.join(".shard");
2734    if !shard_dir.exists() {
2735        anyhow::bail!("not a shard repository (run `shard init` first)");
2736    }
2737    let keys_dir = shard_dir.join("keys");
2738    let records = keychain::load_records(&keys_dir)?;
2739    let current_id = keychain::get_current_key_id(&keys_dir)?;
2740
2741    if json {
2742        info!(
2743            "{}",
2744            serde_json::to_string_pretty(&serde_json::json!({
2745                "current": current_id,
2746                "records": &records,
2747            }))?
2748        );
2749    } else {
2750        info!("Current key: {}", current_id);
2751        info!("Key history:");
2752        for record in &records {
2753            let marker = if record.key_id == current_id {
2754                " (active)"
2755            } else {
2756                ""
2757            };
2758            info!(
2759                "  {}  created_at={}{}",
2760                record.key_id, record.created_at, marker
2761            );
2762            if let Some(prev) = &record.previous_key_id {
2763                info!("    previous: {}", prev);
2764            }
2765        }
2766    }
2767    Ok(())
2768}
2769
2770/// Verify the integrity of the keychain: check every rotation's signature.
2771pub fn key_verify(path: &Path, json: bool) -> Result<()> {
2772    let shard_dir = path.join(".shard");
2773    if !shard_dir.exists() {
2774        anyhow::bail!("not a shard repository (run `shard init` first)");
2775    }
2776    let keys_dir = shard_dir.join("keys");
2777    let errors = keychain::verify_keychain(&keys_dir)?;
2778
2779    if json {
2780        info!(
2781            "{}",
2782            serde_json::to_string(&serde_json::json!({
2783                "verified": errors.is_empty(),
2784                "errors": errors,
2785            }))?
2786        );
2787    } else {
2788        if errors.is_empty() {
2789            info!("Keychain verification successful.");
2790        } else {
2791            for err in &errors {
2792                error!("Keychain error: {}", err);
2793            }
2794            anyhow::bail!("Keychain verification failed ({} errors).", errors.len());
2795        }
2796    }
2797    Ok(())
2798}