Skip to main content

shard_core/
lib.rs

1pub mod branch;
2pub mod chunker;
3pub mod commit;
4pub mod compression;
5pub mod index;
6pub mod manifest;
7pub mod store;
8pub mod wal;
9
10use crate::commit::Commit;
11use crate::compression::Compression;
12use crate::index::Index;
13use crate::manifest::FileManifest;
14use crate::store::Store;
15use anyhow::Result;
16use ed25519_dalek::{Signer, Verifier};
17use serde::Serialize;
18use shard_crypto::KeyPair;
19use shard_net::libp2p::futures::StreamExt;
20use std::collections::HashMap;
21use std::fs;
22use std::io::Write;
23use std::path::Path;
24use std::time::{Duration, SystemTime, UNIX_EPOCH};
25use tracing::{error, info};
26
27pub fn init(
28    path: &Path,
29    backend: &str,
30    compression_algo: &str,
31    chunker_mode: &str,
32    chunk_size: Option<u64>,
33) -> Result<()> {
34    let shard_dir = path.join(".shard");
35    if shard_dir.exists() {
36        anyhow::bail!(
37            "repository already initialized at {} (run `shard status` to confirm)",
38            shard_dir.display()
39        );
40    }
41    fs::create_dir_all(shard_dir.join("objects"))?;
42    fs::create_dir_all(shard_dir.join("keys"))?;
43    fs::create_dir_all(shard_dir.join("refs").join("heads"))?;
44    branch::set_head_branch(&shard_dir, "main")?;
45
46    let keys = KeyPair::generate();
47    keys.save(&shard_dir.join("keys"))?;
48
49    // Generate a deterministic repo identity from the public key
50    // (same key = same repo_id, so clones share the gossipsub topic)
51    let pubkey = fs::read(shard_dir.join("keys/public.key"))?;
52    let repo_id = blake3::hash(&pubkey).to_hex().to_string();
53    let mut config = load_config(&shard_dir)?;
54    config.insert("repo_id".to_string(), repo_id);
55    config.insert("storage_backend".to_string(), backend.to_string());
56    config.insert("compression".to_string(), compression_algo.to_string());
57    config.insert("chunker_mode".to_string(), chunker_mode.to_string());
58    match chunker_mode {
59        "rabin" => {
60            let chunk_size = chunk_size.unwrap_or(4_194_304);
61            let min = chunk_size / 4;
62            let max = chunk_size * 2;
63            config.insert("chunk_min".to_string(), min.to_string());
64            config.insert("chunk_avg".to_string(), chunk_size.to_string());
65            config.insert("chunk_max".to_string(), max.to_string());
66        }
67        _ => {
68            let cs = chunk_size.unwrap_or(4_194_304);
69            config.insert("chunk_size".to_string(), cs.to_string());
70        }
71    }
72    save_config(&shard_dir, &config)?;
73
74    let chunker_desc = if chunker_mode == "rabin" {
75        format!(
76            "rabin (avg {} bytes)",
77            config.get("chunk_avg").unwrap_or(&"4 MiB".to_string())
78        )
79    } else {
80        format!(
81            "fixed ({} bytes)",
82            config.get("chunk_size").unwrap_or(&"4 MiB".to_string())
83        )
84    };
85    info!(
86        "Initialized empty Shard repository in {} with {} storage (compression: {}, chunking: {})",
87        shard_dir.display(),
88        backend,
89        compression_algo,
90        chunker_desc,
91    );
92    Ok(())
93}
94
95fn relative_path(repo_root: &Path, file_path: &Path) -> String {
96    let repo = repo_root
97        .canonicalize()
98        .unwrap_or_else(|_| repo_root.to_path_buf());
99    let file = file_path
100        .canonicalize()
101        .unwrap_or_else(|_| file_path.to_path_buf());
102    file.strip_prefix(&repo)
103        .map(|p| p.to_string_lossy().to_string())
104        .unwrap_or_else(|_| {
105            file_path
106                .file_name()
107                .map(|s| s.to_string_lossy().to_string())
108                .unwrap_or_default()
109        })
110}
111
112fn add_file(
113    repo_root: &Path,
114    file_path: &Path,
115    store: &Store,
116    index: &mut Index,
117    compression: &Compression,
118    chunker_mode: &chunker::ChunkerMode,
119) -> Result<()> {
120    let file = fs::File::open(file_path)?;
121    let mut chunker = match chunker_mode {
122        chunker::ChunkerMode::Fixed { chunk_size } => {
123            chunker::Chunker::new_fixed(Box::new(file), *chunk_size)
124        }
125        chunker::ChunkerMode::Rabin { min, avg, max } => {
126            chunker::Chunker::new_rabin(Box::new(file), *min, *avg, *max)
127        }
128    };
129    let mut chunk_hashes = Vec::new();
130    let mut total_size = 0;
131
132    while let Some(chunk) = chunker.next_chunk()? {
133        let hash = chunk.hash;
134        let compressed_data = compression.compress(&chunk.data)?;
135        let stored = crate::chunker::Chunk {
136            hash,
137            data: compressed_data,
138            offset: chunk.offset,
139        };
140        store.put_chunk(&stored)?;
141        chunk_hashes.push(hash.to_hex().to_string());
142        total_size += chunk.data.len() as u64;
143    }
144
145    let name = relative_path(repo_root, file_path);
146    let manifest = FileManifest {
147        name: name.clone(),
148        size: total_size,
149        chunks: chunk_hashes,
150        content_type: None,
151        compression: compression.as_str().to_string(),
152    };
153
154    index.files.insert(name.clone(), manifest);
155    info!("Added {} ({})", name, total_size);
156    Ok(())
157}
158
159pub fn recover(path: &Path) -> Result<()> {
160    let shard_dir = path.join(".shard");
161    if !shard_dir.exists() {
162        anyhow::bail!("not a shard repository (run `shard init` first)");
163    }
164    wal::recover(&shard_dir)?;
165    info!("Recovery complete.");
166    Ok(())
167}
168
169pub fn add(path: &Path, file_path: &Path) -> Result<()> {
170    let shard_dir = path.join(".shard");
171    if !shard_dir.exists() {
172        anyhow::bail!("not a shard repository (run `shard init` first)");
173    }
174
175    wal::recover(&shard_dir)?;
176
177    let config = load_config(&shard_dir)?;
178    let compression: Compression = config
179        .get("compression")
180        .map(|s| s.as_str())
181        .unwrap_or("zstd")
182        .parse()?;
183
184    let chunker_mode = chunker::ChunkerMode::from_config(&config);
185
186    let store = Store::open(&shard_dir)?;
187    let mut index = Index::load(&shard_dir.join("index"))?;
188
189    if file_path.is_dir() {
190        for entry in walkdir::WalkDir::new(file_path)
191            .into_iter()
192            .filter_entry(|e| {
193                e.file_name()
194                    .to_str()
195                    .map(|s| !s.starts_with('.'))
196                    .unwrap_or(false)
197            })
198        {
199            let entry = entry?;
200            if entry.file_type().is_file() {
201                add_file(
202                    path,
203                    entry.path(),
204                    &store,
205                    &mut index,
206                    &compression,
207                    &chunker_mode,
208                )?;
209            }
210        }
211    } else {
212        add_file(
213            path,
214            file_path,
215            &store,
216            &mut index,
217            &compression,
218            &chunker_mode,
219        )?;
220    }
221
222    index.save(&shard_dir.join("index"))?;
223    Ok(())
224}
225
226pub fn commit(path: &Path, message: &str, author: &str) -> Result<()> {
227    let shard_dir = path.join(".shard");
228    if !shard_dir.exists() {
229        anyhow::bail!("not a shard repository (run `shard init` first)");
230    }
231
232    // Recover from any previous crash before mutating
233    wal::recover(&shard_dir)?;
234
235    let store = Store::open(&shard_dir)?;
236    let mut index = Index::load(&shard_dir.join("index"))?;
237
238    if index.files.is_empty() {
239        anyhow::bail!("nothing to commit (stage files with `shard add` first)");
240    }
241
242    let head_path = shard_dir.join("HEAD");
243
244    // WAL: back up pre-commit state
245    let wal = wal::Wal::new(&shard_dir);
246    let head_backup = fs::read_to_string(&head_path).ok();
247    let index_backup = fs::read(shard_dir.join("index"))?;
248    wal.append(&wal::WalEntry::CommitBegin {
249        head_backup,
250        index_backup,
251    })?;
252
253    // 1. Store manifests
254    let mut manifest_ids = Vec::new();
255    for manifest in index.files.values() {
256        let json = serde_json::to_vec(manifest)?;
257        let hash = blake3::hash(&json);
258        let chunk = crate::chunker::Chunk {
259            hash,
260            data: json,
261            offset: 0,
262        };
263        store.put_chunk(&chunk)?;
264        manifest_ids.push(hash.to_hex().to_string());
265    }
266    manifest_ids.sort();
267
268    // 2. Get parent
269    let mut parents = Vec::new();
270    let (current_branch, parent_id) = branch::resolve_head(&shard_dir)?;
271    if let Some(pid) = parent_id {
272        parents.push(pid);
273    }
274
275    // 3. Create commit
276    let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
277    let keys = KeyPair::load(&shard_dir.join("keys"))?;
278    let public_key_hex = hex::encode(keys.verifying_key.to_bytes());
279    let mut commit = Commit {
280        commit_id: String::new(),
281        parents,
282        manifests: manifest_ids,
283        author: author.to_string(),
284        message: message.to_string(),
285        timestamp,
286        public_key: Some(public_key_hex),
287        signature: None,
288    };
289
290    // 4. Sign
291    let signing_key = keys.signing_key;
292    let json_unsigned = serde_json::to_vec(&commit)?;
293    let signature = signing_key.sign(&json_unsigned);
294    commit.signature = Some(hex::encode(signature.to_bytes()));
295
296    // 5. Store commit
297    let json_final = serde_json::to_vec(&commit)?;
298    let hash = blake3::hash(&json_final);
299    let chunk = crate::chunker::Chunk {
300        hash,
301        data: json_final,
302        offset: 0,
303    };
304    store.put_chunk(&chunk)?;
305
306    // 6. Update HEAD and branch ref
307    let commit_id = hash.to_hex().to_string();
308    if let Some(ref branch_name) = current_branch {
309        branch::update_branch_ref(&shard_dir, branch_name, &commit_id)?;
310        branch::set_head_branch(&shard_dir, branch_name)?;
311    } else {
312        fs::write(&head_path, &commit_id)?;
313    }
314
315    // 7. Clear index
316    index.files.clear();
317    index.save(&shard_dir.join("index"))?;
318
319    // WAL: mark commit complete
320    wal.append(&wal::WalEntry::CommitEnd)?;
321    wal.truncate()?;
322
323    info!("Committed {} ({})", commit_id, message);
324    Ok(())
325}
326
327pub fn verify(path: &Path, commit_id: &str, json: bool) -> Result<()> {
328    let shard_dir = path.join(".shard");
329    if !shard_dir.exists() {
330        anyhow::bail!("not a shard repository (run `shard init` first)");
331    }
332
333    if commit_id.len() < 2 {
334        anyhow::bail!("invalid commit id (too short: need at least 2 characters)");
335    }
336    let store = Store::open(&shard_dir)?;
337    let commit_data = store.get_chunk(commit_id)?;
338    let commit: Commit = serde_json::from_slice(&commit_data)?;
339
340    let mut sig_verified = false;
341    let mut files_checked = 0u64;
342
343    if let Some(sig_hex) = &commit.signature {
344        let verifying_key = if let Some(pk_hex) = &commit.public_key {
345            let pk_bytes = hex::decode(pk_hex)?;
346            ed25519_dalek::VerifyingKey::from_bytes(pk_bytes.as_slice().try_into()?)?
347        } else {
348            let pub_key_path = shard_dir.join("keys/public.key");
349            let pub_bytes = fs::read(pub_key_path)?;
350            ed25519_dalek::VerifyingKey::from_bytes(pub_bytes.as_slice().try_into()?)?
351        };
352
353        let mut unsigned_commit = commit.clone();
354        unsigned_commit.signature = None;
355        let json_unsigned = serde_json::to_vec(&unsigned_commit)?;
356
357        let sig_bytes = hex::decode(sig_hex)?;
358        let signature = ed25519_dalek::Signature::from_bytes(sig_bytes.as_slice().try_into()?);
359
360        verifying_key.verify(&json_unsigned, &signature)?;
361        sig_verified = true;
362        if !json {
363            info!("Signature verified.");
364        }
365    } else if !json {
366        info!("Warning: Commit is unsigned.");
367    }
368
369    for manifest_id in &commit.manifests {
370        let manifest_data = store.get_chunk(manifest_id)?;
371        let hash = blake3::hash(&manifest_data);
372        if hash.to_hex().to_string() != *manifest_id {
373            anyhow::bail!("manifest object hash mismatch for manifest '{}': content does not match stored hash. The object store may be corrupted.", manifest_id);
374        }
375
376        let manifest: FileManifest = serde_json::from_slice(&manifest_data)?;
377        let compression = manifest.compression.parse::<Compression>()?;
378        if !json {
379            info!(
380                "Verifying file: {} (compression: {})",
381                manifest.name, manifest.compression
382            );
383        }
384
385        for chunk_id in &manifest.chunks {
386            let chunk_data = store.get_chunk(chunk_id)?;
387            let decompressed = compression.decompress(&chunk_data)?;
388            let hash = blake3::hash(&decompressed);
389            if hash.to_hex().to_string() != *chunk_id {
390                anyhow::bail!("chunk hash mismatch for '{}': content does not match stored hash (expected {}, got {}). File may be corrupted.", manifest.name, chunk_id, hash.to_hex());
391            }
392        }
393        files_checked += 1;
394    }
395
396    if json {
397        info!(
398            "{}",
399            serde_json::to_string(&serde_json::json!({
400                "commit_id": commit_id,
401                "verified": true,
402                "signature_verified": sig_verified,
403                "files_checked": files_checked,
404            }))?
405        );
406    } else {
407        info!("Verification successful.");
408    }
409    Ok(())
410}
411
412fn load_commit(store: &Store, commit_id: &str) -> Result<Commit> {
413    if commit_id.len() < 2 {
414        anyhow::bail!(
415            "commit id too short (got {} chars, need at least 2): '{}'",
416            commit_id.len(),
417            commit_id
418        );
419    };
420    let data = store.get_chunk(commit_id)?;
421    let mut commit: Commit = serde_json::from_slice(&data)?;
422    commit.commit_id = commit_id.to_string();
423    Ok(commit)
424}
425
426#[derive(Serialize)]
427pub struct LogEntry {
428    pub commit_id: String,
429    pub parents: Vec<String>,
430    pub manifests: Vec<String>,
431    pub author: String,
432    pub message: String,
433    pub timestamp: u64,
434    pub signature: Option<String>,
435}
436
437impl From<Commit> for LogEntry {
438    fn from(c: Commit) -> Self {
439        LogEntry {
440            commit_id: c.commit_id,
441            parents: c.parents,
442            manifests: c.manifests,
443            author: c.author,
444            message: c.message,
445            timestamp: c.timestamp,
446            signature: c.signature,
447        }
448    }
449}
450
451pub fn log_cmd(path: &Path, json: bool) -> Result<()> {
452    let shard_dir = path.join(".shard");
453    if !shard_dir.exists() {
454        anyhow::bail!("not a shard repository (run `shard init` first)");
455    }
456
457    let store = Store::open(&shard_dir)?;
458
459    let (_, head_commit) = branch::resolve_head(&shard_dir)?;
460    let head = head_commit
461        .ok_or_else(|| anyhow::anyhow!("no commits yet (run `shard commit` after adding files)"))?;
462
463    let mut entries: Vec<LogEntry> = Vec::new();
464    let mut seen = std::collections::HashSet::new();
465    let mut stack = vec![head];
466
467    while let Some(cid) = stack.pop() {
468        if !seen.insert(cid.clone()) {
469            continue;
470        }
471        let commit = load_commit(&store, &cid)?;
472        for parent in &commit.parents {
473            stack.push(parent.clone());
474        }
475        entries.push(commit.into());
476    }
477
478    if json {
479        info!("{}", serde_json::to_string_pretty(&entries)?);
480    } else {
481        for entry in &entries {
482            let ts = {
483                let secs = entry.timestamp as i64;
484                let tm = time::OffsetDateTime::from_unix_timestamp(secs)
485                    .unwrap_or(time::OffsetDateTime::UNIX_EPOCH);
486                tm.format(&time::format_description::well_known::Rfc3339)
487                    .unwrap_or_else(|_| entry.timestamp.to_string())
488            };
489            info!("commit {}", entry.commit_id);
490            if !entry.parents.is_empty() {
491                info!("parents: {}", entry.parents.join(" "));
492            }
493            info!("author: {}", entry.author);
494            info!("date:   {}", ts);
495            info!("");
496            for line in entry.message.lines() {
497                info!("    {}", line);
498            }
499            info!("");
500        }
501    }
502
503    Ok(())
504}
505
506pub fn checkout(path: &Path, target: &str, json: bool) -> Result<()> {
507    let shard_dir = path.join(".shard");
508    if !shard_dir.exists() {
509        anyhow::bail!("not a shard repository (run `shard init` first)");
510    }
511
512    let store = Store::open(&shard_dir)?;
513
514    // Resolve target: branch name or commit id
515    let branch_path = shard_dir.join("refs").join("heads").join(target);
516    let commit_id = if branch_path.exists() {
517        let id = fs::read_to_string(&branch_path)?.trim().to_string();
518        branch::set_head_branch(&shard_dir, target)?;
519        id
520    } else {
521        branch::set_head_commit(&shard_dir, target)?;
522        target.to_string()
523    };
524
525    let commit = load_commit(&store, &commit_id)?;
526    let mut files = Vec::new();
527
528    for manifest_id in &commit.manifests {
529        let data = store.get_chunk(manifest_id)?;
530        let hash = blake3::hash(&data);
531        if hash.to_hex().to_string() != *manifest_id {
532            anyhow::bail!("Manifest hash mismatch: {}", manifest_id);
533        }
534        let manifest: FileManifest = serde_json::from_slice(&data)?;
535        let compression = manifest.compression.parse::<Compression>()?;
536        if !json {
537            info!(
538                "Checking out file: {} (compression: {})",
539                manifest.name, manifest.compression
540            );
541        }
542
543        let mut file_data = Vec::new();
544        for chunk_id in &manifest.chunks {
545            let chunk_data = store.get_chunk(chunk_id)?;
546            let decompressed = compression.decompress(&chunk_data)?;
547            file_data.extend_from_slice(&decompressed);
548        }
549        fs::write(path.join(&manifest.name), file_data)?;
550        if !json {
551            info!("  -> {}", manifest.name);
552        }
553        files.push(manifest.name);
554    }
555
556    if json {
557        info!(
558            "{}",
559            serde_json::to_string(&serde_json::json!({
560                "commit_id": commit_id,
561                "files": files,
562            }))?
563        );
564    } else {
565        info!("Checkout complete.");
566    }
567    Ok(())
568}
569
570pub fn status(path: &Path, json: bool) -> Result<()> {
571    let shard_dir = path.join(".shard");
572    if !shard_dir.exists() {
573        anyhow::bail!("not a shard repository (run `shard init` first)");
574    }
575
576    let (current_branch, head_commit) = branch::resolve_head(&shard_dir)?;
577    let mut commit_id: Option<String> = None;
578    if let Some(cid) = head_commit {
579        commit_id = Some(cid);
580        if !json {
581            if let Some(ref branch) = current_branch {
582                info!("On branch: {}", branch);
583            } else {
584                info!("HEAD detached at {}", commit_id.as_ref().unwrap());
585            }
586        }
587    } else if !json {
588        info!("No commits yet.");
589    }
590
591    let index = Index::load(&shard_dir.join("index"))?;
592    let staged: Vec<String> = index.files.keys().cloned().collect();
593    if !json {
594        if staged.is_empty() {
595            info!("Nothing staged.");
596        } else {
597            info!("\nStaged files:");
598            for name in &staged {
599                info!("  {} (to be committed)", name);
600            }
601        }
602    }
603
604    let store = Store::open(&shard_dir)?;
605    let mut deleted = Vec::new();
606    let tracked_names: std::collections::HashSet<String> = if let Some(head) = &commit_id {
607        let mut names = std::collections::HashSet::new();
608        if let Ok(commit) = load_commit(&store, head) {
609            for manifest_id in &commit.manifests {
610                if let Ok(data) = store.get_chunk(manifest_id) {
611                    if let Ok(manifest) = serde_json::from_slice::<FileManifest>(&data) {
612                        let file_path = path.join(&manifest.name);
613                        if !file_path.exists() {
614                            deleted.push(manifest.name.clone());
615                        }
616                        names.insert(manifest.name);
617                    }
618                }
619            }
620        }
621        names
622    } else {
623        std::collections::HashSet::new()
624    };
625
626    if !json && !deleted.is_empty() {
627        info!("\nDeleted files:");
628        for name in &deleted {
629            info!("  {} (deleted)", name);
630        }
631    }
632
633    let mut untracked = Vec::new();
634    if let Ok(entries) = fs::read_dir(path) {
635        for entry in entries.flatten() {
636            if let Ok(ftype) = entry.file_type() {
637                if ftype.is_file() {
638                    let name = entry.file_name().to_string_lossy().to_string();
639                    if !name.starts_with('.')
640                        && !index.files.contains_key(&name)
641                        && !tracked_names.contains(&name)
642                    {
643                        untracked.push(name);
644                    }
645                }
646            }
647        }
648    }
649    if !json && !untracked.is_empty() {
650        info!("\nUntracked files:");
651        for name in &untracked {
652            info!("  {}", name);
653        }
654    }
655
656    if json {
657        info!(
658            "{}",
659            serde_json::to_string(&serde_json::json!({
660                "commit": commit_id,
661                "staged": staged,
662                "deleted": deleted,
663                "untracked": untracked,
664            }))?
665        );
666    }
667
668    Ok(())
669}
670
671fn load_config(shard_dir: &Path) -> Result<std::collections::BTreeMap<String, String>> {
672    let config_path = shard_dir.join("config.json");
673    if config_path.exists() {
674        let data = fs::read(&config_path)?;
675        Ok(serde_json::from_slice(&data)?)
676    } else {
677        Ok(std::collections::BTreeMap::new())
678    }
679}
680
681fn save_config(
682    shard_dir: &Path,
683    config: &std::collections::BTreeMap<String, String>,
684) -> Result<()> {
685    let data = serde_json::to_string_pretty(config)?;
686    fs::write(shard_dir.join("config.json"), data)?;
687    Ok(())
688}
689
690pub fn config_get(path: &Path, key: Option<&str>) -> Result<()> {
691    let shard_dir = path.join(".shard");
692    if !shard_dir.exists() {
693        anyhow::bail!("not a shard repository (run `shard init` first)");
694    }
695    let config = load_config(&shard_dir)?;
696    if let Some(key) = key {
697        match config.get(key) {
698            Some(value) => info!("{} = {}", key, value),
699            None => anyhow::bail!("config key not found: {}", key),
700        }
701    } else {
702        for (k, v) in &config {
703            info!("{} = {}", k, v);
704        }
705    }
706    Ok(())
707}
708
709pub fn config_set(path: &Path, key: &str, value: &str) -> Result<()> {
710    let shard_dir = path.join(".shard");
711    if !shard_dir.exists() {
712        anyhow::bail!("not a shard repository (run `shard init` first)");
713    }
714    let mut config = load_config(&shard_dir)?;
715    config.insert(key.to_string(), value.to_string());
716    save_config(&shard_dir, &config)?;
717    info!("{} = {}", key, value);
718    Ok(())
719}
720
721fn load_tags(shard_dir: &Path) -> Result<std::collections::BTreeMap<String, String>> {
722    let tags_path = shard_dir.join("tags.json");
723    if tags_path.exists() {
724        let data = fs::read(&tags_path)?;
725        Ok(serde_json::from_slice(&data)?)
726    } else {
727        Ok(std::collections::BTreeMap::new())
728    }
729}
730
731fn save_tags(shard_dir: &Path, tags: &std::collections::BTreeMap<String, String>) -> Result<()> {
732    let data = serde_json::to_string_pretty(tags)?;
733    fs::write(shard_dir.join("tags.json"), data)?;
734    Ok(())
735}
736
737pub fn tag_add(path: &Path, name: &str, commit_id: &str) -> Result<()> {
738    let shard_dir = path.join(".shard");
739    if !shard_dir.exists() {
740        anyhow::bail!("not a shard repository (run `shard init` first)");
741    }
742    // Verify commit exists
743    let store = Store::open(&shard_dir)?;
744    load_commit(&store, commit_id)?;
745    let mut tags = load_tags(&shard_dir)?;
746    tags.insert(name.to_string(), commit_id.to_string());
747    save_tags(&shard_dir, &tags)?;
748    info!("Tagged '{}' -> {}", name, commit_id);
749    Ok(())
750}
751
752pub fn branch_create(path: &Path, name: &str, commit_id: Option<&str>) -> Result<()> {
753    let shard_dir = path.join(".shard");
754    if !shard_dir.exists() {
755        anyhow::bail!("not a shard repository (run `shard init` first)");
756    }
757    let id = match commit_id {
758        Some(cid) => cid.to_string(),
759        None => {
760            let (_, head) = branch::resolve_head(&shard_dir)?;
761            head.ok_or_else(|| anyhow::anyhow!("No commits yet — cannot create branch"))?
762        }
763    };
764    // Verify commit exists
765    let store = Store::open(&shard_dir)?;
766    load_commit(&store, &id)?;
767    branch::create_branch(&shard_dir, name, &id)
768}
769
770pub fn branch_delete(path: &Path, name: &str) -> Result<()> {
771    let shard_dir = path.join(".shard");
772    if !shard_dir.exists() {
773        anyhow::bail!("not a shard repository (run `shard init` first)");
774    }
775    branch::delete_branch(&shard_dir, name)
776}
777
778pub fn branch_list(path: &Path) -> Result<()> {
779    let shard_dir = path.join(".shard");
780    if !shard_dir.exists() {
781        anyhow::bail!("not a shard repository (run `shard init` first)");
782    }
783    let (current, branches) = branch::list_branches(&shard_dir)?;
784    if branches.is_empty() {
785        info!("No branches.");
786        return Ok(());
787    }
788    for (name, commit_id) in &branches {
789        let prefix = if current.as_deref() == Some(name) {
790            "* "
791        } else {
792            "  "
793        };
794        info!(
795            "{}{} ({})",
796            prefix,
797            name,
798            &commit_id[..8.min(commit_id.len())]
799        );
800    }
801    Ok(())
802}
803
804pub fn merge(path: &Path, branch: &str, message: &str, author: &str) -> Result<()> {
805    let shard_dir = path.join(".shard");
806    if !shard_dir.exists() {
807        anyhow::bail!("not a shard repository (run `shard init` first)");
808    }
809
810    let store = Store::open(&shard_dir)?;
811
812    // Resolve current HEAD
813    let (current_branch, current_id) = branch::resolve_head(&shard_dir)?;
814    let current_id =
815        current_id.ok_or_else(|| anyhow::anyhow!("No commits yet — nothing to merge into"))?;
816
817    // Resolve source branch
818    let source_id = branch::resolve_rev(&shard_dir, branch)?;
819    if source_id == current_id {
820        anyhow::bail!("Already up to date — source is the same commit as HEAD");
821    }
822
823    // Load both commits
824    let current_commit = load_commit(&store, &current_id)?;
825    let source_commit = load_commit(&store, &source_id)?;
826
827    // Load manifests from both sides
828    let mut merged_manifests: std::collections::HashMap<String, (String, Vec<String>)> =
829        std::collections::HashMap::new();
830
831    for manifest_id in &current_commit.manifests {
832        let data = store.get_chunk(manifest_id)?;
833        let manifest: FileManifest = serde_json::from_slice(&data)?;
834        merged_manifests.insert(manifest.name.clone(), (manifest.name, manifest.chunks));
835    }
836
837    for manifest_id in &source_commit.manifests {
838        let data = store.get_chunk(manifest_id)?;
839        let manifest: FileManifest = serde_json::from_slice(&data)?;
840        merged_manifests.insert(manifest.name.clone(), (manifest.name, manifest.chunks));
841    }
842
843    // Store merged manifests
844    let mut merged_manifest_ids = Vec::new();
845    for (name, chunks) in merged_manifests.values() {
846        let compression = Compression::None;
847        let manifest = FileManifest {
848            name: name.clone(),
849            size: 0,
850            chunks: chunks.clone(),
851            content_type: None,
852            compression: compression.as_str().to_string(),
853        };
854        let json = serde_json::to_vec(&manifest)?;
855        let hash = blake3::hash(&json);
856        store.put_chunk(&crate::chunker::Chunk {
857            hash,
858            data: json,
859            offset: 0,
860        })?;
861        merged_manifest_ids.push(hash.to_hex().to_string());
862    }
863    merged_manifest_ids.sort();
864
865    // Create merge commit
866    let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
867    let keys = KeyPair::load(&shard_dir.join("keys"))?;
868    let public_key_hex = hex::encode(keys.verifying_key.to_bytes());
869    let parents = vec![current_id.clone(), source_id.clone()];
870    let mut commit = Commit {
871        commit_id: String::new(),
872        parents,
873        manifests: merged_manifest_ids,
874        author: author.to_string(),
875        message: message.to_string(),
876        timestamp,
877        public_key: Some(public_key_hex),
878        signature: None,
879    };
880
881    let signing_key = keys.signing_key;
882    let json_unsigned = serde_json::to_vec(&commit)?;
883    let signature = signing_key.sign(&json_unsigned);
884    commit.signature = Some(hex::encode(signature.to_bytes()));
885
886    let json_final = serde_json::to_vec(&commit)?;
887    let hash = blake3::hash(&json_final);
888    store.put_chunk(&crate::chunker::Chunk {
889        hash,
890        data: json_final,
891        offset: 0,
892    })?;
893
894    let merge_commit_id = hash.to_hex().to_string();
895
896    // Update HEAD and branch ref
897    if let Some(ref branch_name) = current_branch {
898        branch::update_branch_ref(&shard_dir, branch_name, &merge_commit_id)?;
899        branch::set_head_branch(&shard_dir, branch_name)?;
900    } else {
901        branch::set_head_commit(&shard_dir, &merge_commit_id)?;
902    }
903
904    info!("Merge commit {} ({})", merge_commit_id, message);
905    Ok(())
906}
907
908pub fn tag_list(path: &Path) -> Result<()> {
909    let shard_dir = path.join(".shard");
910    if !shard_dir.exists() {
911        anyhow::bail!("not a shard repository (run `shard init` first)");
912    }
913    let tags = load_tags(&shard_dir)?;
914    if tags.is_empty() {
915        info!("No tags.");
916    } else {
917        for (name, commit_id) in &tags {
918            info!("{} -> {}", name, commit_id);
919        }
920    }
921    Ok(())
922}
923
924fn collect_reachable(
925    store: &Store,
926    commit_id: &str,
927    seen_commits: &mut std::collections::HashSet<String>,
928    reachable: &mut std::collections::HashSet<String>,
929) -> Result<()> {
930    if !seen_commits.insert(commit_id.to_string()) {
931        return Ok(());
932    }
933
934    reachable.insert(commit_id.to_string());
935
936    let commit = match load_commit(store, commit_id) {
937        Ok(c) => c,
938        Err(_) => return Ok(()),
939    };
940
941    for manifest_id in &commit.manifests {
942        reachable.insert(manifest_id.clone());
943
944        if let Ok(data) = store.get_chunk(manifest_id) {
945            if let Ok(manifest) = serde_json::from_slice::<FileManifest>(&data) {
946                for chunk_id in &manifest.chunks {
947                    reachable.insert(chunk_id.clone());
948                }
949            }
950        }
951    }
952
953    for parent_id in &commit.parents {
954        collect_reachable(store, parent_id, seen_commits, reachable)?;
955    }
956
957    Ok(())
958}
959
960pub fn prune(path: &Path) -> Result<()> {
961    let shard_dir = path.join(".shard");
962    if !shard_dir.exists() {
963        anyhow::bail!("not a shard repository (run `shard init` first)");
964    }
965
966    let store = Store::open(&shard_dir)?;
967    let mut reachable: std::collections::HashSet<String> = std::collections::HashSet::new();
968
969    // 1. Walk from HEAD commit (and all branch tips)
970    let (_, head_commit) = branch::resolve_head(&shard_dir)?;
971    if let Some(ref head) = head_commit {
972        collect_reachable(
973            &store,
974            head,
975            &mut std::collections::HashSet::new(),
976            &mut reachable,
977        )?;
978    }
979
980    // Also walk from all branch refs (in case HEAD is detached from any branch)
981    if let Ok(branches) = branch::list_branches(&shard_dir) {
982        for (_, commit_id) in branches.1 {
983            collect_reachable(
984                &store,
985                &commit_id,
986                &mut std::collections::HashSet::new(),
987                &mut reachable,
988            )?;
989        }
990    }
991
992    // 2. Walk from tags
993    let tags = load_tags(&shard_dir)?;
994    for commit_id in tags.values() {
995        collect_reachable(
996            &store,
997            commit_id,
998            &mut std::collections::HashSet::new(),
999            &mut reachable,
1000        )?;
1001    }
1002
1003    // 3. Walk from index (staged files)
1004    let index = Index::load(&shard_dir.join("index"))?;
1005    for manifest in index.files.values() {
1006        let json = serde_json::to_vec(manifest)?;
1007        let hash = blake3::hash(&json);
1008        let hash_hex = hash.to_hex().to_string();
1009        reachable.insert(hash_hex);
1010        for chunk_hash in &manifest.chunks {
1011            reachable.insert(chunk_hash.clone());
1012        }
1013    }
1014
1015    // 4. Scan objects and remove unreachable
1016    let all_chunks = store.iter_chunks()?;
1017    let mut pruned = 0u64;
1018    let mut kept = 0u64;
1019    for (hash_hex, full_path) in &all_chunks {
1020        if !reachable.contains(hash_hex) {
1021            store.delete_chunk(hash_hex, Some(full_path))?;
1022            pruned += 1;
1023        } else {
1024            kept += 1;
1025        }
1026    }
1027
1028    info!("Pruned {} objects. {} objects remain.", pruned, kept);
1029    Ok(())
1030}
1031
1032pub fn peer_add(path: &Path, multiaddr: &str) -> Result<()> {
1033    let shard_dir = path.join(".shard");
1034    if !shard_dir.exists() {
1035        anyhow::bail!("not a shard repository (run `shard init` first)");
1036    }
1037
1038    // Validate multiaddr format
1039    if multiaddr.is_empty() || multiaddr.parse::<shard_net::libp2p::Multiaddr>().is_err() {
1040        anyhow::bail!("invalid multiaddr '{}' (must be a valid libp2p multiaddr, e.g. /ip4/1.2.3.4/tcp/5678/p2p/...)", multiaddr);
1041    }
1042
1043    let peers_path = shard_dir.join("peers.json");
1044    let mut peers: Vec<String> = if peers_path.exists() {
1045        let data = fs::read(&peers_path)?;
1046        serde_json::from_slice(&data)?
1047    } else {
1048        Vec::new()
1049    };
1050
1051    if !peers.contains(&multiaddr.to_string()) {
1052        peers.push(multiaddr.to_string());
1053        let data = serde_json::to_vec(&peers)?;
1054        fs::write(peers_path, data)?;
1055        info!("Added peer: {}", multiaddr);
1056    } else {
1057        info!("Peer already exists: {}", multiaddr);
1058    }
1059
1060    Ok(())
1061}
1062
1063fn load_peers(shard_dir: &Path) -> Result<Vec<String>> {
1064    let peers_path = shard_dir.join("peers.json");
1065    if peers_path.exists() {
1066        let data = fs::read(peers_path)?;
1067        Ok(serde_json::from_slice(&data)?)
1068    } else {
1069        Ok(Vec::new())
1070    }
1071}
1072
1073fn authorized_keys_path(shard_dir: &Path) -> std::path::PathBuf {
1074    shard_dir.join("authorized_keys")
1075}
1076
1077fn load_authorized_keys(shard_dir: &Path) -> Result<Vec<ed25519_dalek::VerifyingKey>> {
1078    let path = authorized_keys_path(shard_dir);
1079    if !path.exists() {
1080        return Ok(Vec::new());
1081    }
1082    let content = fs::read_to_string(&path)?;
1083    let mut keys = Vec::new();
1084    for line in content.lines() {
1085        let line = line.trim();
1086        if line.is_empty() || line.starts_with('#') {
1087            continue;
1088        }
1089        let bytes = hex::decode(line)?;
1090        let arr: [u8; 32] = bytes
1091            .as_slice()
1092            .try_into()
1093            .map_err(|_| anyhow::anyhow!("Invalid public key length in authorized_keys"))?;
1094        keys.push(ed25519_dalek::VerifyingKey::from_bytes(&arr)?);
1095    }
1096    Ok(keys)
1097}
1098
1099pub fn add_authorized_key(shard_dir: &Path, public_key_hex: &str) -> Result<()> {
1100    // Validate the key
1101    let bytes = hex::decode(public_key_hex)?;
1102    let arr: [u8; 32] = bytes
1103        .as_slice()
1104        .try_into()
1105        .map_err(|_| anyhow::anyhow!("Public key must be 32 bytes (64 hex chars)"))?;
1106    let _pk = ed25519_dalek::VerifyingKey::from_bytes(&arr)?;
1107
1108    let path = authorized_keys_path(shard_dir);
1109    let mut content = if path.exists() {
1110        fs::read_to_string(&path)?
1111    } else {
1112        String::new()
1113    };
1114    // Check if key already exists
1115    if content.lines().any(|l| l.trim() == public_key_hex) {
1116        info!("Key already authorized");
1117        return Ok(());
1118    }
1119    content.push_str(public_key_hex);
1120    content.push('\n');
1121    fs::write(&path, content)?;
1122    info!("Authorized key added");
1123    Ok(())
1124}
1125
1126pub fn backup(path: &Path, output: &Path) -> Result<()> {
1127    let shard_dir = path.join(".shard");
1128    if !shard_dir.exists() {
1129        anyhow::bail!("not a shard repository (run `shard init` first)");
1130    }
1131    let file = fs::File::create(output)?;
1132    let encoder = flate2::write::GzEncoder::new(file, flate2::Compression::default());
1133    let mut archive = tar::Builder::new(encoder);
1134    archive.append_dir_all(".", &shard_dir)?;
1135    archive.finish()?;
1136    info!("Backup created: {}", output.display());
1137    Ok(())
1138}
1139
1140pub fn export(path: &Path, commit_id: &str, output_dir: &Path, json: bool) -> Result<()> {
1141    let shard_dir = path.join(".shard");
1142    if !shard_dir.exists() {
1143        anyhow::bail!("not a shard repository (run `shard init` first)");
1144    }
1145    let store = Store::open(&shard_dir)?;
1146    let commit = load_commit(&store, commit_id)?;
1147    let mut files = Vec::new();
1148    for manifest_id in &commit.manifests {
1149        let data = store.get_chunk(manifest_id)?;
1150        let manifest: FileManifest = serde_json::from_slice(&data)?;
1151        let compression = manifest.compression.parse::<Compression>()?;
1152        if !json {
1153            info!("Exporting file: {}", manifest.name);
1154        }
1155        let mut file_data = Vec::new();
1156        for chunk_id in &manifest.chunks {
1157            let chunk_data = store.get_chunk(chunk_id)?;
1158            let decompressed = compression.decompress(&chunk_data)?;
1159            file_data.extend_from_slice(&decompressed);
1160        }
1161        let out_path = output_dir.join(&manifest.name);
1162        if let Some(parent) = out_path.parent() {
1163            fs::create_dir_all(parent)?;
1164        }
1165        fs::write(&out_path, file_data)?;
1166        if !json {
1167            info!("  -> {}", out_path.display());
1168        }
1169        files.push(manifest.name);
1170    }
1171    if json {
1172        info!(
1173            "{}",
1174            serde_json::to_string(&serde_json::json!({
1175                "commit_id": commit_id,
1176                "files": files,
1177                "output_dir": output_dir.to_string_lossy(),
1178            }))?
1179        );
1180    } else {
1181        info!("Export complete.");
1182    }
1183    Ok(())
1184}
1185
1186pub fn import(path: &Path, source_dir: &Path, message: &str, author: &str) -> Result<()> {
1187    let shard_dir = path.join(".shard");
1188    if !shard_dir.exists() {
1189        anyhow::bail!("not a shard repository (run `shard init` first)");
1190    }
1191    // Walk files in source_dir
1192    let config = load_config(&shard_dir)?;
1193    let compression: Compression = config
1194        .get("compression")
1195        .map(|s| s.as_str())
1196        .unwrap_or("zstd")
1197        .parse()?;
1198    let chunker_mode = chunker::ChunkerMode::from_config(&config);
1199    let store = Store::open(&shard_dir)?;
1200    let mut index = Index::load(&shard_dir.join("index"))?;
1201    if !source_dir.is_dir() {
1202        anyhow::bail!("Source must be a directory");
1203    }
1204    for entry in walkdir::WalkDir::new(source_dir)
1205        .into_iter()
1206        .filter_entry(|e| {
1207            e.file_name()
1208                .to_str()
1209                .map(|s| !s.starts_with('.'))
1210                .unwrap_or(false)
1211        })
1212    {
1213        let entry = entry?;
1214        if entry.file_type().is_file() {
1215            add_file(
1216                path,
1217                entry.path(),
1218                &store,
1219                &mut index,
1220                &compression,
1221                &chunker_mode,
1222            )?;
1223        }
1224    }
1225    index.save(&shard_dir.join("index"))?;
1226    // Auto-commit
1227    if !index.files.is_empty() {
1228        commit(path, message, author)?;
1229    } else {
1230        info!("No files found to import.");
1231    }
1232    Ok(())
1233}
1234
1235pub fn restore(path: &Path, backup_file: &Path) -> Result<()> {
1236    let shard_dir = path.join(".shard");
1237    if shard_dir.exists() {
1238        anyhow::bail!(
1239            "Repository already exists — remove .shard first or use a different directory"
1240        );
1241    }
1242    let file = fs::File::open(backup_file)?;
1243    let decoder = flate2::read::GzDecoder::new(file);
1244    let mut archive = tar::Archive::new(decoder);
1245    archive.unpack(path)?;
1246    // Verify the result
1247    if !path.join(".shard").exists() {
1248        anyhow::bail!("Backup does not contain a valid .shard directory");
1249    }
1250    info!("Restored from {}", backup_file.display());
1251    Ok(())
1252}
1253
1254struct RepoProvider {
1255    store: Store,
1256    shard_dir: std::path::PathBuf,
1257}
1258
1259impl shard_net::p2p::ShardContentProvider for RepoProvider {
1260    fn get_manifest(&self, id: &str) -> Option<Vec<u8>> {
1261        self.store.get_chunk(id).ok()
1262    }
1263    fn get_chunk(&self, id: &str) -> Option<Vec<u8>> {
1264        self.store.get_chunk(id).ok()
1265    }
1266    fn put_chunk(&mut self, id: &str, data: &[u8]) -> bool {
1267        let hash = blake3::hash(data);
1268        let hex = hash.to_hex().to_string();
1269        if hex != id {
1270            return false;
1271        }
1272        self.store
1273            .put_chunk(&crate::chunker::Chunk {
1274                hash,
1275                data: data.to_vec(),
1276                offset: 0,
1277            })
1278            .is_ok()
1279    }
1280    fn verify_auth(&self, public_key: &[u8], nonce: &[u8], signature: &[u8]) -> bool {
1281        use ed25519_dalek::Verifier;
1282        let pk_bytes: [u8; 32] = match public_key.try_into() {
1283            Ok(b) => b,
1284            Err(_) => return false,
1285        };
1286        let pk = match ed25519_dalek::VerifyingKey::from_bytes(&pk_bytes) {
1287            Ok(k) => k,
1288            Err(_) => return false,
1289        };
1290        let sig_bytes: [u8; 64] = match signature.try_into() {
1291            Ok(b) => b,
1292            Err(_) => return false,
1293        };
1294        let sig = ed25519_dalek::Signature::from_bytes(&sig_bytes);
1295        if pk.verify(nonce, &sig).is_err() {
1296            return false;
1297        }
1298        // Check authorized_keys if the file exists
1299        if let Ok(keys) = load_authorized_keys(&self.shard_dir) {
1300            if !keys.is_empty() {
1301                return keys.contains(&pk);
1302            }
1303        }
1304        true
1305    }
1306    fn repo_public_key(&self) -> Option<Vec<u8>> {
1307        let keys = shard_crypto::KeyPair::load(&self.shard_dir.join("keys")).ok()?;
1308        Some(keys.verifying_key.to_bytes().to_vec())
1309    }
1310}
1311
1312pub async fn share(path: &Path) -> Result<()> {
1313    let shard_dir = path.join(".shard");
1314    if !shard_dir.exists() {
1315        anyhow::bail!("not a shard repository (run `shard init` first)");
1316    }
1317
1318    let mut node = shard_net::p2p::Node::new().await?;
1319
1320    // Bootstrap from peers
1321    let peers = load_peers(&shard_dir)?;
1322    for peer in peers {
1323        if let Ok(addr) = peer.parse::<shard_net::libp2p::Multiaddr>() {
1324            let _ = node.swarm.dial(addr);
1325        }
1326    }
1327
1328    node.listen("/ip4/0.0.0.0/tcp/0").await?; // Listen on random port (TCP)
1329
1330    // In a real implementation, we would load the repo and serve requests.
1331    // For now, we just start the node to prove connectivity.
1332    info!("Sharing repository...");
1333    let store = Store::open(&shard_dir)?;
1334    let provider = RepoProvider {
1335        store,
1336        shard_dir: shard_dir.clone(),
1337    };
1338    node.run(provider).await;
1339
1340    Ok(())
1341}
1342
1343pub async fn sync(path: &Path) -> Result<()> {
1344    let shard_dir = path.join(".shard");
1345    if !shard_dir.exists() {
1346        anyhow::bail!("not a shard repository (run `shard init` first)");
1347    }
1348
1349    let config = load_config(&shard_dir)?;
1350    let repo_id = config
1351        .get("repo_id")
1352        .ok_or_else(|| anyhow::anyhow!("No repo_id in config. Run `shard init` to create one."))?;
1353    let topic_str = format!("/shard/repo/{}", repo_id);
1354    let topic = shard_net::libp2p::gossipsub::IdentTopic::new(topic_str);
1355
1356    let mut node = shard_net::p2p::Node::new().await?;
1357    node.subscribe(&topic)?;
1358    node.listen("/ip4/0.0.0.0/tcp/0").await?;
1359
1360    // Bootstrap from configured peers
1361    let peers = load_peers(&shard_dir)?;
1362    for peer in peers {
1363        if let Ok(addr) = peer.parse::<shard_net::libp2p::Multiaddr>() {
1364            let _ = node.swarm.dial(addr);
1365        }
1366    }
1367
1368    let head_commit = branch::resolve_head(&shard_dir)?.1;
1369
1370    // Initial announce (may fail with InsufficientPeers if no peers yet)
1371    if let Some(ref head) = head_commit {
1372        let msg = format!("announce:{}", head);
1373        match node.publish(&topic, msg.as_bytes()) {
1374            Ok(_) => info!("Announced commit {} on sync topic", head),
1375            Err(e) => error!("Initial announce (will retry): {}", e),
1376        }
1377    } else {
1378        info!("No commits to announce");
1379    }
1380
1381    info!("Syncing on topic with peer id: {}", node.local_peer_id());
1382    let _ = std::io::stdout().flush();
1383
1384    let store = Store::open(&shard_dir)?;
1385    let mut provider = RepoProvider {
1386        store,
1387        shard_dir: shard_dir.clone(),
1388    };
1389
1390    let mut interval = tokio::time::interval(Duration::from_secs(5));
1391    let mut address_book: HashMap<shard_net::libp2p::PeerId, Vec<shard_net::libp2p::Multiaddr>> =
1392        HashMap::new();
1393    let path_buf = path.to_path_buf();
1394
1395    loop {
1396        tokio::select! {
1397            _ = tokio::signal::ctrl_c() => {
1398                info!("\nSync shutting down...");
1399                break Ok(());
1400            }
1401            _ = interval.tick() => {
1402                if let Some(ref head) = branch::resolve_head(&shard_dir)?.1 {
1403                    let msg = format!("announce:{}", head);
1404                    match node.publish(&topic, msg.as_bytes()) {
1405                        Ok(_) => info!("Re-announced commit {} on sync topic", head),
1406                        Err(e) => error!("Re-announce failed: {}", e),
1407                    }
1408                }
1409            }
1410            event = node.swarm.select_next_some() => {
1411                match event {
1412                    shard_net::libp2p::swarm::SwarmEvent::NewListenAddr { address, .. } => {
1413                        info!("Listening on {address:?}");
1414                        let _ = std::io::stdout().flush();
1415                    }
1416                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
1417                        shard_net::p2p::ShardBehaviourEvent::Mdns(
1418                            shard_net::libp2p::mdns::Event::Discovered(list),
1419                        ),
1420                    ) => {
1421                        for (peer_id, multiaddr) in list {
1422                            info!("mDNS discovered: {peer_id} {multiaddr}");
1423                            address_book.entry(peer_id).or_default().push(multiaddr.clone());
1424                            node.swarm
1425                                .behaviour_mut()
1426                                .gossipsub
1427                                .add_explicit_peer(&peer_id);
1428                            node.swarm
1429                                .behaviour_mut()
1430                                .kademlia
1431                                .add_address(&peer_id, multiaddr);
1432                        }
1433                    }
1434                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
1435                        shard_net::p2p::ShardBehaviourEvent::Mdns(shard_net::libp2p::mdns::Event::Expired(
1436                            list,
1437                        )),
1438                    ) => {
1439                        for (peer_id, _multiaddr) in list {
1440                            info!("mDNS expired: {peer_id}");
1441                            node.swarm
1442                                .behaviour_mut()
1443                                .gossipsub
1444                                .remove_explicit_peer(&peer_id);
1445                        }
1446                    }
1447                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
1448                        shard_net::p2p::ShardBehaviourEvent::Gossipsub(
1449                            shard_net::libp2p::gossipsub::Event::Message {
1450                                propagation_source,
1451                                message,
1452                                ..
1453                            },
1454                        ),
1455                    ) => {
1456                        if let Ok(text) = String::from_utf8(message.data.clone()) {
1457                            if let Some(commit_id) = text.strip_prefix("announce:") {
1458                                info!(
1459                                    "Peer {} announced commit: {}",
1460                                    propagation_source, commit_id
1461                                );
1462                                let peer = propagation_source;
1463                                let commit_id_owned = commit_id.to_string();
1464                                // Reply with our HEAD if different (triggers peer to pull from us)
1465                                let our_head = branch::resolve_head(&shard_dir)?.1.unwrap_or_default();
1466                                if our_head != commit_id_owned {
1467                                    let msg = format!("announce:{}", our_head);
1468                                    let _ = node.publish(&topic, msg.as_bytes());
1469                                }
1470                                if let Some(addrs) = address_book.get(&peer) {
1471                                    if let Some(addr) = addrs.first() {
1472                                        let multiaddr_str = format!("{}/p2p/{}", addr, peer);
1473                                        let path_clone = path_buf.clone();
1474                                        tokio::spawn(async move {
1475                                            match pull(&path_clone, &multiaddr_str, &commit_id_owned).await {
1476                                                Ok(_) => info!("Auto-pulled commit {} from {}", commit_id_owned, peer),
1477                                                Err(e) => error!("Auto-pull failed for commit {} from {}: {}", commit_id_owned, peer, e),
1478                                            }
1479                                        });
1480                                    }
1481                                }
1482                            }
1483                        }
1484                    }
1485                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
1486                        shard_net::p2p::ShardBehaviourEvent::RequestResponse(
1487                            shard_net::libp2p::request_response::Event::Message { peer, message },
1488                        ),
1489                    ) => {
1490                        if let shard_net::libp2p::request_response::Message::Request {
1491                            request, channel, ..
1492                        } = message
1493                        {
1494                            info!("Received request from {}", peer);
1495                            node.serve_request(&peer, &mut provider, request, channel);
1496                        } else {
1497                            info!("Received Response from {}", peer);
1498                        }
1499                    }
1500                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
1501                        shard_net::p2p::ShardBehaviourEvent::RequestResponse(
1502                            shard_net::libp2p::request_response::Event::OutboundFailure {
1503                                peer, error, ..
1504                            },
1505                        ),
1506                    ) => {
1507                        error!("Outbound failure to {}: {:?}", peer, error);
1508                    }
1509                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
1510                        shard_net::p2p::ShardBehaviourEvent::RequestResponse(
1511                            shard_net::libp2p::request_response::Event::InboundFailure {
1512                                peer, error, ..
1513                            },
1514                        ),
1515                    ) => {
1516                        error!("Inbound failure from {}: {:?}", peer, error);
1517                    }
1518                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
1519                        shard_net::p2p::ShardBehaviourEvent::Identify(
1520                            shard_net::libp2p::identify::Event::Received { peer_id, info },
1521                        ),
1522                    ) => {
1523                        info!("Identify received from {}: {:?}", peer_id, info.listen_addrs);
1524                        for addr in info.listen_addrs {
1525                            address_book.entry(peer_id).or_default().push(addr);
1526                        }
1527                        let _ = std::io::stdout().flush();
1528                    }
1529                    shard_net::libp2p::swarm::SwarmEvent::Behaviour(
1530                        shard_net::p2p::ShardBehaviourEvent::Identify(event),
1531                    ) => {
1532                        info!("Identify event: {:?}", event);
1533                    }
1534                    shard_net::libp2p::swarm::SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } => {
1535                        info!("Connection established with {}", peer_id);
1536                        // Only store the address when we dialed (it's the peer's listen addr).
1537                        // For listener connections, send_back_addr is the ephemeral port — useless for dialing back.
1538                        if let shard_net::libp2p::core::ConnectedPoint::Dialer { address, .. } = &endpoint {
1539                            address_book.entry(peer_id).or_default().push(address.clone());
1540                        }
1541                        node.swarm
1542                            .behaviour_mut()
1543                            .gossipsub
1544                            .add_explicit_peer(&peer_id);
1545                        // Announce HEAD to the newly connected peer
1546                        if let Some(ref head) = branch::resolve_head(&shard_dir)?.1 {
1547                            let msg = format!("announce:{}", head);
1548                            let _ = node.publish(&topic, msg.as_bytes());
1549                        }
1550                    }
1551                    shard_net::libp2p::swarm::SwarmEvent::IncomingConnection {
1552                        local_addr,
1553                        send_back_addr,
1554                        ..
1555                    } => {
1556                        info!(
1557                            "Incoming connection from {} to {}",
1558                            send_back_addr, local_addr
1559                        );
1560                    }
1561                    e => {
1562                        info!("Event: {:?}", e);
1563                    }
1564                }
1565            }
1566        }
1567    }
1568}
1569
1570pub async fn pull(path: &Path, peer: &str, commit_id: &str) -> Result<()> {
1571    let shard_dir = path.join(".shard");
1572    // pull can work on empty repo or existing one.
1573    // if !shard_dir.exists() { init(path)?; }
1574
1575    if !shard_dir.exists() {
1576        init(path, "flat", "zstd", "fixed", None)?;
1577    }
1578
1579    let store = Store::open(&shard_dir)?;
1580
1581    let mut node = shard_net::p2p::Node::new().await?;
1582
1583    // Parse peer multiaddr
1584    let multiaddr: shard_net::libp2p::Multiaddr = peer.parse()?;
1585    let peer_id = match multiaddr.iter().last() {
1586        Some(shard_net::libp2p::multiaddr::Protocol::P2p(peer_id)) => peer_id,
1587        _ => anyhow::bail!("Multiaddr must end with /p2p/<peer_id>"),
1588    };
1589
1590    // 1. Get Commit (sequential — single request)
1591    info!("Pulling commit {} from {}...", commit_id, peer);
1592    let commit_data = node
1593        .request_manifest(&multiaddr, peer_id, commit_id.to_string())
1594        .await?;
1595    let hash = blake3::hash(&commit_data);
1596    if hash.to_hex().to_string() != commit_id {
1597        anyhow::bail!("Commit hash mismatch");
1598    }
1599    let chunk = crate::chunker::Chunk {
1600        hash,
1601        data: commit_data.clone(),
1602        offset: 0,
1603    };
1604    store.put_chunk(&chunk)?;
1605
1606    let commit: Commit = serde_json::from_slice(&commit_data)?;
1607    info!("Got commit: {}", commit.message);
1608
1609    // Set repo_id from commit's public key so clones share the gossipsub topic
1610    if let Some(pk_hex) = &commit.public_key {
1611        let pk_bytes = hex::decode(pk_hex)?;
1612        let repo_id = blake3::hash(&pk_bytes).to_hex().to_string();
1613        let mut config = load_config(&shard_dir)?;
1614        config.insert("repo_id".to_string(), repo_id);
1615        save_config(&shard_dir, &config)?;
1616    }
1617
1618    // 2. Fetch all manifests in parallel
1619    let manifest_requests: Vec<(String, shard_net::protocol::ShardRequest)> = commit
1620        .manifests
1621        .iter()
1622        .map(|id| {
1623            (
1624                id.clone(),
1625                shard_net::protocol::ShardRequest::GetManifest(id.clone()),
1626            )
1627        })
1628        .collect();
1629    let manifest_results = node
1630        .request_parallel(&multiaddr, peer_id, manifest_requests)
1631        .await?;
1632
1633    let mut all_chunk_ids: Vec<String> = Vec::new();
1634    let mut file_manifests: Vec<FileManifest> = Vec::new();
1635    // Map chunk_id -> compression type for verification in step 3
1636    let mut chunk_compression: HashMap<String, String> = HashMap::new();
1637
1638    for (manifest_id, manifest_data) in &manifest_results {
1639        let hash = blake3::hash(manifest_data);
1640        if hash.to_hex().to_string() != *manifest_id {
1641            anyhow::bail!("Manifest hash mismatch: {}", manifest_id);
1642        }
1643        let chunk = crate::chunker::Chunk {
1644            hash,
1645            data: manifest_data.clone(),
1646            offset: 0,
1647        };
1648        store.put_chunk(&chunk)?;
1649        let manifest: FileManifest = serde_json::from_slice(manifest_data)?;
1650        info!(
1651            "Fetching file: {} (compression: {})",
1652            manifest.name, manifest.compression
1653        );
1654        for cid in &manifest.chunks {
1655            chunk_compression.insert(cid.clone(), manifest.compression.clone());
1656        }
1657        all_chunk_ids.extend(manifest.chunks.clone());
1658        file_manifests.push(manifest);
1659    }
1660
1661    // 3. Fetch all missing chunks in parallel
1662    let needed_chunks: Vec<String> = all_chunk_ids
1663        .into_iter()
1664        .filter(|id| store.get_chunk(id).is_err())
1665        .collect();
1666
1667    if !needed_chunks.is_empty() {
1668        info!("Fetching {} chunks...", needed_chunks.len());
1669        let chunk_requests: Vec<(String, shard_net::protocol::ShardRequest)> = needed_chunks
1670            .iter()
1671            .map(|id| {
1672                (
1673                    id.clone(),
1674                    shard_net::protocol::ShardRequest::GetChunk(id.clone()),
1675                )
1676            })
1677            .collect();
1678        let chunk_results = node
1679            .request_parallel(&multiaddr, peer_id, chunk_requests)
1680            .await?;
1681        for (chunk_id, chunk_data) in &chunk_results {
1682            // Determine compression from the manifest this chunk belongs to
1683            let compression: Compression = chunk_compression
1684                .get(chunk_id)
1685                .map(|s| s.as_str())
1686                .unwrap_or("none")
1687                .parse()?;
1688            // Decompress to verify the content hash
1689            let decompressed = compression.decompress(chunk_data)?;
1690            let hash = blake3::hash(&decompressed);
1691            if hash.to_hex().to_string() != *chunk_id {
1692                anyhow::bail!("Chunk hash mismatch: {}", chunk_id);
1693            }
1694            // Store the compressed data (as received)
1695            let chunk = crate::chunker::Chunk {
1696                hash,
1697                data: chunk_data.clone(),
1698                offset: 0,
1699            };
1700            store.put_chunk(&chunk)?;
1701        }
1702    }
1703
1704    // 4. Reconstruct all files
1705    for manifest in &file_manifests {
1706        let compression = manifest.compression.parse::<Compression>()?;
1707        let mut file_data = Vec::new();
1708        for chunk_id in &manifest.chunks {
1709            let compressed = store.get_chunk(chunk_id)?;
1710            let decompressed = compression.decompress(&compressed)?;
1711            file_data.extend_from_slice(&decompressed);
1712        }
1713        fs::write(path.join(&manifest.name), file_data)?;
1714        info!(
1715            "Reconstructed file: {} ({} bytes)",
1716            manifest.name, manifest.size
1717        );
1718    }
1719
1720    info!("Pull complete.");
1721    Ok(())
1722}
1723
1724pub async fn push(path: &Path, peer: &str) -> Result<()> {
1725    let shard_dir = path.join(".shard");
1726    if !shard_dir.exists() {
1727        anyhow::bail!("not a shard repository (run `shard init` first)");
1728    }
1729
1730    let (_, head_id) = branch::resolve_head(&shard_dir)?;
1731    let head_id = head_id.ok_or_else(|| anyhow::anyhow!("No commits to push"))?;
1732
1733    let store = Store::open(&shard_dir)?;
1734
1735    // Collect all reachable objects
1736    let mut objects: std::collections::BTreeMap<String, Vec<u8>> =
1737        std::collections::BTreeMap::new();
1738
1739    // Walk commits
1740    let mut seen = std::collections::HashSet::new();
1741    let mut stack = vec![head_id.clone()];
1742    while let Some(cid) = stack.pop() {
1743        if !seen.insert(cid.clone()) {
1744            continue;
1745        }
1746        if let Ok(data) = store.get_chunk(&cid) {
1747            objects.insert(cid, data.clone());
1748            if let Ok(commit) = serde_json::from_slice::<Commit>(&data) {
1749                for mid in &commit.manifests {
1750                    if let Ok(manifest_data) = store.get_chunk(mid) {
1751                        objects.insert(mid.clone(), manifest_data.clone());
1752                        if let Ok(manifest) = serde_json::from_slice::<FileManifest>(&manifest_data)
1753                        {
1754                            for cid in &manifest.chunks {
1755                                if let Ok(chunk_data) = store.get_chunk(cid) {
1756                                    objects.insert(cid.clone(), chunk_data);
1757                                }
1758                            }
1759                        }
1760                    }
1761                }
1762                for parent in &commit.parents {
1763                    stack.push(parent.clone());
1764                }
1765            }
1766        }
1767    }
1768
1769    info!(
1770        "Pushing {} objects ({} bytes)...",
1771        objects.len(),
1772        objects.values().map(|v| v.len() as u64).sum::<u64>()
1773    );
1774
1775    // Connect and send all objects
1776    let mut node = shard_net::p2p::Node::new().await?;
1777    let multiaddr: shard_net::libp2p::Multiaddr = peer.parse()?;
1778    let peer_id = match multiaddr.iter().last() {
1779        Some(shard_net::libp2p::multiaddr::Protocol::P2p(peer_id)) => peer_id,
1780        _ => anyhow::bail!("Multiaddr must end with /p2p/<peer_id>"),
1781    };
1782
1783    for (id, data) in &objects {
1784        node.request_put_chunk(&multiaddr, peer_id, id.clone(), data.clone())
1785            .await?;
1786    }
1787
1788    info!("Push complete ({} objects).", objects.len());
1789    Ok(())
1790}