use std::path::{Path, PathBuf};
use anyhow::{bail, Context, Result};
use bao_tree::ChunkRanges;
use futures_lite::StreamExt;
use iroh::{endpoint::presets, protocol::Router, Endpoint, EndpointAddr, EndpointId};
use iroh_blobs::{
api::blobs::{AddPathOptions, BlobStatus, ImportMode},
format::collection::Collection,
store::{fs::FsStore, GcConfig},
BlobFormat, Hash, HashAndFormat,
};
use std::time::Duration;
use tracing::info;
use walkdir::WalkDir;
use super::protocol::{encode_ranges_wire, RingGate, Status, SC_ALPN};
use crate::config::Config;
use crate::registry::Registry;
use crate::ticket::ShareTicket;
pub struct Node {
pub endpoint: Endpoint,
pub store: FsStore,
pub registry: Registry,
router: Router,
}
impl Node {
pub async fn start(data_dir: impl AsRef<Path>) -> Result<Self> {
let data_dir = data_dir.as_ref().to_path_buf();
tokio::fs::create_dir_all(&data_dir).await?;
let cfg = Config::load_or_create(&data_dir).context("loading config")?;
let endpoint = Endpoint::builder(presets::N0)
.secret_key(cfg.secret_key)
.bind()
.await
.context("binding iroh endpoint")?;
let blobs_dir = data_dir.join("blobs");
let db_path = blobs_dir.join("blobs.db");
let mut fs_opts = iroh_blobs::store::fs::options::Options::new(&blobs_dir);
fs_opts.gc = Some(GcConfig {
interval: Duration::from_secs(30),
add_protected: None,
});
let store = FsStore::load_with_opts(db_path, fs_opts)
.await
.context("loading FsStore")?;
let registry = Registry::open(data_dir.join("registry.redb"), endpoint.id())
.context("opening registry")?;
let gate = RingGate::new(registry.clone(), store.clone());
let router = Router::builder(endpoint.clone())
.accept(SC_ALPN, gate)
.spawn();
endpoint.online().await;
info!(peer_id = %endpoint.id(), "node online");
Ok(Node {
endpoint,
store,
registry,
router,
})
}
pub fn peer_id(&self) -> EndpointId {
self.endpoint.id()
}
pub fn node_addr(&self) -> EndpointAddr {
self.endpoint.addr()
}
pub async fn import_file(&self, path: impl AsRef<Path>) -> Result<(Hash, BlobFormat)> {
let path = std::path::absolute(path.as_ref())?;
info!(path = %path.display(), "importing file");
let tag = self
.store
.blobs()
.add_path_with_opts(AddPathOptions {
path,
mode: ImportMode::TryReference,
format: BlobFormat::Raw,
})
.temp_tag()
.await
.context("add_path")?;
let hash = tag.hash();
let format = BlobFormat::Raw;
self.store
.tags()
.set(hash.to_string(), HashAndFormat { hash, format })
.await
.context("pinning blob tag")?;
info!(%hash, "imported — outboard computed");
Ok((hash, format))
}
pub async fn import_directory(&self, dir: impl AsRef<Path>) -> Result<(Hash, BlobFormat)> {
let dir = dir.as_ref();
info!(dir = %dir.display(), "importing directory");
let mut files: Vec<(String, PathBuf)> = Vec::new();
for entry in WalkDir::new(dir)
.follow_links(false)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().is_file())
{
let rel = entry
.path()
.strip_prefix(dir)
.unwrap_or(entry.path())
.to_string_lossy()
.into_owned();
files.push((rel, entry.path().to_path_buf()));
}
let mut collection = Collection::default();
for (name, path) in files {
let tag = self
.store
.blobs()
.add_path_with_opts(AddPathOptions {
path: std::path::absolute(&path)?,
mode: ImportMode::TryReference,
format: BlobFormat::Raw,
})
.temp_tag()
.await?;
info!(name, hash = %tag.hash(), "added to collection");
collection.push(name, tag.hash());
}
let col_tag = collection.store(&self.store).await?;
let hash = col_tag.hash();
let format = BlobFormat::HashSeq;
self.store
.tags()
.set(hash.to_string(), HashAndFormat { hash, format })
.await
.context("pinning collection tag")?;
info!(%hash, "collection stored");
Ok((hash, format))
}
pub async fn list_blobs(&self) -> Result<Vec<(Hash, BlobFormat)>> {
let mut stream = self.store.tags().list().await?;
let mut blobs = Vec::new();
while let Some(item) = stream.next().await {
let info = item?;
blobs.push((info.hash, info.format));
}
Ok(blobs)
}
pub async fn delete_blob(&self, hash: Hash) -> Result<()> {
self.store
.tags()
.delete(hash.to_string())
.await
.context("removing blob tag")?;
Ok(())
}
pub fn make_ticket(&self, hash: Hash, format: BlobFormat, name: Option<String>) -> ShareTicket {
let addr = self.node_addr();
match format {
BlobFormat::HashSeq => ShareTicket::new_collection(addr, hash, name),
_ => ShareTicket::new(addr, hash, name),
}
}
pub async fn download(&self, ticket: &ShareTicket, dest: impl AsRef<Path>) -> Result<()> {
let dest = dest.as_ref().to_path_buf();
let hash = ticket.hash();
let format = ticket.format();
let node_addr = ticket.node_addr().clone();
info!(hash = %hash, from = %node_addr.id, "starting download");
let already_have: ChunkRanges = match self.store.blobs().status(hash).await {
Ok(BlobStatus::Complete { .. }) => ChunkRanges::all(),
_ => ChunkRanges::default(),
};
let missing = ChunkRanges::all() & !already_have.clone();
if missing.is_empty() {
info!(hash = %hash, "all chunks present — skipping download");
return self.export(hash, format, &ticket.name, &dest).await;
}
info!(hash = %hash, "sending range request");
let conn = self
.endpoint
.connect(node_addr, SC_ALPN)
.await
.context("connecting to sender")?;
let (mut send, mut recv) = conn.open_bi().await?;
send.write_all(hash.as_bytes()).await?;
send.write_all(&encode_ranges_wire(&already_have)).await?;
send.finish()?;
let mut status_byte = [0u8; 1];
recv.read_exact(&mut status_byte)
.await
.context("reading status")?;
match Status::try_from(status_byte[0])? {
Status::Denied => bail!(
"access denied — not in a ring for this blob.\n\
Your PeerId: {}",
self.peer_id()
),
Status::Allowed => {}
}
let _ = self
.store
.blobs()
.import_bao_reader(hash, missing, &mut recv)
.await
.context("decoding and storing bao stream")?;
info!(hash = %hash, "all chunks verified and written");
self.export(hash, format, &ticket.name, &dest).await
}
async fn export(
&self,
hash: Hash,
format: BlobFormat,
name: &Option<String>,
dest: &Path,
) -> Result<()> {
let export_path = if dest.is_dir() {
dest.join(name.as_deref().unwrap_or("download"))
} else {
dest.to_path_buf()
};
let export_path = std::path::absolute(&export_path)?;
match format {
BlobFormat::HashSeq => {
tokio::fs::create_dir_all(&export_path).await?;
let collection = Collection::load(hash, &*self.store)
.await
.context("loading collection")?;
for (name, blob_hash) in collection.iter() {
let target = export_path.join(name);
if let Some(parent) = target.parent() {
tokio::fs::create_dir_all(parent).await?;
}
self.store
.blobs()
.export(*blob_hash, &target)
.finish()
.await
.with_context(|| format!("exporting {name}"))?;
}
}
_ => {
self.store
.blobs()
.export(hash, &export_path)
.finish()
.await
.context("exporting blob")?;
}
}
info!("export complete");
Ok(())
}
pub async fn shutdown(self) -> Result<()> {
self.router.shutdown().await?;
Ok(())
}
}