1use anyhow::Result;
9use hashtree_core::{decode_tree_node, to_hex};
10use std::collections::VecDeque;
11use std::sync::Arc;
12use std::time::Duration;
13use tracing::debug;
14
15use crate::storage::HashtreeStore;
16use crate::webrtc::WebRTCState;
17
18#[derive(Clone)]
20pub struct FetchConfig {
21 pub blossom_servers: Vec<String>,
23 pub webrtc_timeout: Duration,
25 pub blossom_timeout: Duration,
27}
28
29impl Default for FetchConfig {
30 fn default() -> Self {
31 Self {
32 blossom_servers: vec!["https://blossom.iris.to".to_string()],
33 webrtc_timeout: Duration::from_millis(2000),
34 blossom_timeout: Duration::from_millis(10000),
35 }
36 }
37}
38
39pub struct Fetcher {
41 config: FetchConfig,
42 http_client: reqwest::Client,
43}
44
45impl Fetcher {
46 pub fn new(config: FetchConfig) -> Self {
48 Self {
49 config,
50 http_client: reqwest::Client::builder()
51 .timeout(Duration::from_secs(30))
52 .build()
53 .unwrap(),
54 }
55 }
56
57 pub async fn fetch_chunk(
59 &self,
60 webrtc_state: Option<&Arc<WebRTCState>>,
61 hash_hex: &str,
62 ) -> Result<Vec<u8>> {
63 let short_hash = if hash_hex.len() >= 12 {
64 &hash_hex[..12]
65 } else {
66 hash_hex
67 };
68
69 if let Some(state) = webrtc_state {
71 debug!("Trying WebRTC for {}", short_hash);
72 let webrtc_result = tokio::time::timeout(
73 self.config.webrtc_timeout,
74 state.request_from_peers(hash_hex),
75 )
76 .await;
77
78 if let Ok(Some(data)) = webrtc_result {
79 debug!("Got {} from WebRTC ({} bytes)", short_hash, data.len());
80 return Ok(data);
81 }
82 }
83
84 for server in &self.config.blossom_servers {
86 let url = format!("{}/{}", server.trim_end_matches('/'), hash_hex);
87 debug!("Trying Blossom {} for {}", server, short_hash);
88
89 let result = tokio::time::timeout(
90 self.config.blossom_timeout,
91 self.http_client.get(&url).send(),
92 )
93 .await;
94
95 match result {
96 Ok(Ok(response)) if response.status().is_success() => {
97 if let Ok(data) = response.bytes().await {
98 debug!("Got {} from Blossom ({} bytes)", short_hash, data.len());
99 return Ok(data.to_vec());
100 }
101 }
102 Ok(Ok(response)) => {
103 debug!("Blossom {} returned {} for {}", server, response.status(), short_hash);
104 }
105 Ok(Err(e)) => {
106 debug!("Blossom {} error for {}: {}", server, short_hash, e);
107 }
108 Err(_) => {
109 debug!("Blossom {} timeout for {}", server, short_hash);
110 }
111 }
112 }
113
114 Err(anyhow::anyhow!("Failed to fetch {} from any source", short_hash))
115 }
116
117 pub async fn fetch_chunk_with_store(
119 &self,
120 store: &HashtreeStore,
121 webrtc_state: Option<&Arc<WebRTCState>>,
122 hash_hex: &str,
123 ) -> Result<Vec<u8>> {
124 if let Some(data) = store.get_chunk(hash_hex)? {
126 return Ok(data);
127 }
128
129 let data = self.fetch_chunk(webrtc_state, hash_hex).await?;
131 store.put_blob(&data)?;
132 Ok(data)
133 }
134
135 pub async fn fetch_tree(
138 &self,
139 store: &HashtreeStore,
140 webrtc_state: Option<&Arc<WebRTCState>>,
141 root_hash: &[u8; 32],
142 ) -> Result<(usize, u64)> {
143 let mut chunks_fetched = 0usize;
144 let mut bytes_fetched = 0u64;
145
146 let root_hex = to_hex(root_hash);
147
148 if store.blob_exists(&root_hex)? {
150 return Ok((0, 0));
151 }
152
153 let mut queue: VecDeque<[u8; 32]> = VecDeque::new();
155 queue.push_back(*root_hash);
156
157 while let Some(hash) = queue.pop_front() {
158 let hash_hex = to_hex(&hash);
159
160 if store.blob_exists(&hash_hex)? {
162 continue;
163 }
164
165 let data = self.fetch_chunk(webrtc_state, &hash_hex).await?;
167
168 store.put_blob(&data)?;
170 chunks_fetched += 1;
171 bytes_fetched += data.len() as u64;
172
173 if let Ok(node) = decode_tree_node(&data) {
175 for link in node.links {
176 queue.push_back(link.hash);
177 }
178 }
179 }
180
181 Ok((chunks_fetched, bytes_fetched))
182 }
183
184 pub async fn fetch_file(
187 &self,
188 store: &HashtreeStore,
189 webrtc_state: Option<&Arc<WebRTCState>>,
190 hash_hex: &str,
191 ) -> Result<Option<Vec<u8>>> {
192 if let Some(content) = store.get_file(hash_hex)? {
194 return Ok(Some(content));
195 }
196
197 let hash = hashtree_core::from_hex(hash_hex)
199 .map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
200
201 self.fetch_tree(store, webrtc_state, &hash).await?;
203
204 store.get_file(hash_hex)
206 }
207
208 pub async fn fetch_directory(
210 &self,
211 store: &HashtreeStore,
212 webrtc_state: Option<&Arc<WebRTCState>>,
213 hash_hex: &str,
214 ) -> Result<Option<crate::storage::DirectoryListing>> {
215 if let Ok(Some(listing)) = store.get_directory_listing(hash_hex) {
217 return Ok(Some(listing));
218 }
219
220 let hash = hashtree_core::from_hex(hash_hex)
222 .map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
223
224 self.fetch_tree(store, webrtc_state, &hash).await?;
226
227 store.get_directory_listing(hash_hex)
229 }
230}