use anyhow::Result;
use hashtree_blossom::BlossomClient;
use hashtree_config::detect_local_daemon_url;
use hashtree_core::{to_hex, Cid, HashTree, HashTreeConfig, Link};
use nostr::Keys;
use std::collections::{HashSet, VecDeque};
use std::sync::Arc;
use std::time::Duration;
use tracing::debug;
use crate::config::Config as CliConfig;
use crate::storage::HashtreeStore;
use crate::webrtc::WebRTCState;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
fn child_cid(parent: &Cid, link: &Link) -> Cid {
let inherits_parent_key = link
.name
.as_deref()
.map(|name| {
name.starts_with("_chunk_")
|| (name.starts_with('_') && name.chars().count() == 2 && link.link_type.is_tree())
})
.unwrap_or(false);
Cid {
hash: link.hash,
key: link.key.or(if inherits_parent_key {
parent.key
} else {
None
}),
}
}
#[derive(Debug, Default)]
pub struct FetchProgress {
chunks_fetched: AtomicUsize,
bytes_fetched: AtomicU64,
}
#[derive(Debug, Clone, Copy, Default)]
pub struct FetchProgressSnapshot {
pub chunks_fetched: usize,
pub bytes_fetched: u64,
}
impl FetchProgress {
pub fn new() -> Self {
Self::default()
}
pub fn snapshot(&self) -> FetchProgressSnapshot {
FetchProgressSnapshot {
chunks_fetched: self.chunks_fetched.load(Ordering::Relaxed),
bytes_fetched: self.bytes_fetched.load(Ordering::Relaxed),
}
}
fn record_chunk(&self, byte_len: usize) {
self.chunks_fetched.fetch_add(1, Ordering::Relaxed);
self.bytes_fetched
.fetch_add(byte_len as u64, Ordering::Relaxed);
}
}
#[derive(Clone)]
pub struct FetchConfig {
pub webrtc_timeout: Duration,
pub blossom_timeout: Duration,
}
impl Default for FetchConfig {
fn default() -> Self {
Self {
webrtc_timeout: Duration::from_millis(2000),
blossom_timeout: Duration::from_millis(10000),
}
}
}
pub struct Fetcher {
config: FetchConfig,
blossom: BlossomClient,
}
impl Fetcher {
pub fn new(config: FetchConfig) -> Self {
let keys = Keys::generate();
let blossom = BlossomClient::new(keys).with_timeout(config.blossom_timeout);
let blossom = with_local_daemon_read(blossom);
Self { config, blossom }
}
pub fn with_keys(config: FetchConfig, keys: Keys) -> Self {
let blossom = BlossomClient::new(keys).with_timeout(config.blossom_timeout);
let blossom = with_local_daemon_read(blossom);
Self { config, blossom }
}
pub fn blossom(&self) -> &BlossomClient {
&self.blossom
}
pub async fn fetch_chunk(
&self,
webrtc_state: Option<&Arc<WebRTCState>>,
hash_hex: &str,
) -> Result<Vec<u8>> {
let short_hash = if hash_hex.len() >= 12 {
&hash_hex[..12]
} else {
hash_hex
};
if let Some(state) = webrtc_state {
debug!("Trying WebRTC for {}", short_hash);
let webrtc_result = tokio::time::timeout(
self.config.webrtc_timeout,
state.request_from_peers(hash_hex),
)
.await;
if let Ok(Some(data)) = webrtc_result {
debug!("Got {} from WebRTC ({} bytes)", short_hash, data.len());
return Ok(data);
}
}
debug!("Trying Blossom for {}", short_hash);
match self.blossom.download(hash_hex).await {
Ok(data) => {
debug!("Got {} from Blossom ({} bytes)", short_hash, data.len());
Ok(data)
}
Err(e) => {
debug!("Blossom download failed for {}: {}", short_hash, e);
Err(anyhow::anyhow!(
"Failed to fetch {} from any source: {}",
short_hash,
e
))
}
}
}
pub async fn fetch_chunk_with_store(
&self,
store: &HashtreeStore,
webrtc_state: Option<&Arc<WebRTCState>>,
hash: &[u8; 32],
) -> Result<Vec<u8>> {
if let Some(data) = store.get_chunk(hash)? {
return Ok(data);
}
let hash_hex = to_hex(hash);
let data = self.fetch_chunk(webrtc_state, &hash_hex).await?;
store.put_cached_blob(&data)?;
Ok(data)
}
pub async fn fetch_tree(
&self,
store: &HashtreeStore,
webrtc_state: Option<&Arc<WebRTCState>>,
root_hash: &[u8; 32],
) -> Result<(usize, u64)> {
self.fetch_cid_tree(store, webrtc_state, &Cid::public(*root_hash))
.await
}
pub async fn fetch_cid_tree(
&self,
store: &HashtreeStore,
webrtc_state: Option<&Arc<WebRTCState>>,
root_cid: &Cid,
) -> Result<(usize, u64)> {
self.fetch_cid_tree_with_progress(store, webrtc_state, root_cid, None)
.await
}
pub async fn fetch_cid_tree_with_progress(
&self,
store: &HashtreeStore,
webrtc_state: Option<&Arc<WebRTCState>>,
root_cid: &Cid,
progress: Option<&FetchProgress>,
) -> Result<(usize, u64)> {
self.fetch_cid_tree_parallel_with_progress(store, webrtc_state, root_cid, 1, progress)
.await
}
pub async fn fetch_tree_parallel(
&self,
store: &HashtreeStore,
webrtc_state: Option<&Arc<WebRTCState>>,
root_hash: &[u8; 32],
concurrency: usize,
) -> Result<(usize, u64)> {
self.fetch_cid_tree_parallel(store, webrtc_state, &Cid::public(*root_hash), concurrency)
.await
}
pub async fn fetch_cid_tree_parallel(
&self,
store: &HashtreeStore,
webrtc_state: Option<&Arc<WebRTCState>>,
root_cid: &Cid,
concurrency: usize,
) -> Result<(usize, u64)> {
self.fetch_cid_tree_parallel_with_progress(store, webrtc_state, root_cid, concurrency, None)
.await
}
pub async fn fetch_cid_tree_parallel_with_progress(
&self,
store: &HashtreeStore,
webrtc_state: Option<&Arc<WebRTCState>>,
root_cid: &Cid,
concurrency: usize,
progress: Option<&FetchProgress>,
) -> Result<(usize, u64)> {
use futures::stream::{FuturesUnordered, StreamExt};
let chunks_fetched = Arc::new(AtomicUsize::new(0));
let bytes_fetched = Arc::new(AtomicU64::new(0));
let mut queued: HashSet<[u8; 32]> = HashSet::new();
let mut pending: VecDeque<Cid> = VecDeque::new();
pending.push_back(root_cid.clone());
queued.insert(root_cid.hash);
let mut active = FuturesUnordered::new();
let tree = HashTree::new(HashTreeConfig::new(store.store_arc()).public());
loop {
while active.len() < concurrency {
if let Some(cid) = pending.pop_front() {
if store.blob_exists(&cid.hash).unwrap_or(false) {
if let Some(node) = tree.get_node(&cid).await? {
for link in node.links {
let child = child_cid(&cid, &link);
if queued.insert(child.hash) {
pending.push_back(child);
}
}
}
continue;
}
let hash_hex = to_hex(&cid.hash);
let blossom = self.blossom.clone();
let webrtc = webrtc_state.map(Arc::clone);
let timeout = self.config.webrtc_timeout;
let fut = async move {
if let Some(state) = &webrtc {
if let Ok(Some(data)) =
tokio::time::timeout(timeout, state.request_from_peers(&hash_hex))
.await
{
return (cid, Ok(data));
}
}
let data = blossom.download(&hash_hex).await;
(cid, data)
};
active.push(fut);
} else {
break;
}
}
if active.is_empty() {
break;
}
if let Some((cid, result)) = active.next().await {
match result {
Ok(data) => {
store.put_cached_blob(&data)?;
if let Some(progress) = progress {
progress.record_chunk(data.len());
}
chunks_fetched.fetch_add(1, Ordering::Relaxed);
bytes_fetched.fetch_add(data.len() as u64, Ordering::Relaxed);
if let Some(node) = tree.get_node(&cid).await? {
for link in node.links {
let child = child_cid(&cid, &link);
if queued.insert(child.hash) {
pending.push_back(child);
}
}
}
}
Err(e) => {
debug!("Failed to fetch {}: {}", to_hex(&cid.hash), e);
}
}
}
}
Ok((
chunks_fetched.load(Ordering::Relaxed),
bytes_fetched.load(Ordering::Relaxed),
))
}
pub async fn fetch_file(
&self,
store: &HashtreeStore,
webrtc_state: Option<&Arc<WebRTCState>>,
hash: &[u8; 32],
) -> Result<Option<Vec<u8>>> {
if let Some(content) = store.get_file(hash)? {
return Ok(Some(content));
}
self.fetch_tree(store, webrtc_state, hash).await?;
store.get_file(hash)
}
pub async fn fetch_directory(
&self,
store: &HashtreeStore,
webrtc_state: Option<&Arc<WebRTCState>>,
hash: &[u8; 32],
) -> Result<Option<crate::storage::DirectoryListing>> {
if let Ok(Some(listing)) = store.get_directory_listing(hash) {
return Ok(Some(listing));
}
self.fetch_tree(store, webrtc_state, hash).await?;
store.get_directory_listing(hash)
}
pub async fn upload(&self, data: &[u8]) -> Result<String> {
self.blossom
.upload(data)
.await
.map_err(|e| anyhow::anyhow!("Blossom upload failed: {}", e))
}
pub async fn upload_if_missing(&self, data: &[u8]) -> Result<(String, bool)> {
self.blossom
.upload_if_missing(data)
.await
.map_err(|e| anyhow::anyhow!("Blossom upload failed: {}", e))
}
}
fn with_local_daemon_read(blossom: BlossomClient) -> BlossomClient {
let bind_address = CliConfig::load().ok().map(|cfg| cfg.server.bind_address);
let local_url = detect_local_daemon_url(bind_address.as_deref());
let Some(local_url) = local_url else {
return blossom;
};
let mut servers = blossom.read_servers().to_vec();
if servers.iter().any(|server| server == &local_url) {
return blossom;
}
servers.insert(0, local_url);
blossom.with_read_servers(servers)
}