1use crate::{
4 CacheConfig, CacheEntry, ClientConfig, FragmentCache, HttpClient, NetworkConfig, NetworkError,
5 PrioritizedFragment, Priority, RangeRequest, Result, Scheduler, SchedulerConfig,
6};
7use bytes::Bytes;
8use haagenti_fragments::FragmentId;
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use tokio::sync::mpsc;
12use tracing::warn;
13
14#[derive(Debug, Clone)]
16pub struct LoadRequest {
17 pub fragment_id: FragmentId,
19 pub path: String,
21 pub priority: Priority,
23 pub expected_size: Option<u64>,
25 pub importance: f32,
27}
28
29impl LoadRequest {
30 pub fn new(fragment_id: FragmentId, path: impl Into<String>) -> Self {
32 Self {
33 fragment_id,
34 path: path.into(),
35 priority: Priority::Normal,
36 expected_size: None,
37 importance: 0.5,
38 }
39 }
40
41 pub fn with_priority(mut self, priority: Priority) -> Self {
43 self.priority = priority;
44 self
45 }
46
47 pub fn with_expected_size(mut self, size: u64) -> Self {
49 self.expected_size = Some(size);
50 self
51 }
52
53 pub fn with_importance(mut self, importance: f32) -> Self {
55 self.importance = importance;
56 self
57 }
58}
59
60#[derive(Debug)]
62pub enum LoadResult {
63 Success {
65 fragment_id: FragmentId,
66 data: Bytes,
67 duration: Duration,
68 from_cache: bool,
69 },
70 Failed {
72 fragment_id: FragmentId,
73 error: NetworkError,
74 },
75}
76
77impl LoadResult {
78 pub fn is_success(&self) -> bool {
80 matches!(self, LoadResult::Success { .. })
81 }
82
83 pub fn fragment_id(&self) -> FragmentId {
85 match self {
86 LoadResult::Success { fragment_id, .. } => *fragment_id,
87 LoadResult::Failed { fragment_id, .. } => *fragment_id,
88 }
89 }
90}
91
92pub struct NetworkLoader {
94 clients: Vec<HttpClient>,
95 cache: Option<FragmentCache>,
96 scheduler: Scheduler,
97}
98
99impl NetworkLoader {
100 pub async fn new(config: NetworkConfig) -> Result<Self> {
102 let client_config = ClientConfig::from(&config);
104 let mut clients = Vec::new();
105
106 for endpoint in &config.endpoints {
107 let client = HttpClient::new(endpoint.clone(), client_config.clone())?;
108 clients.push(client);
109 }
110
111 if clients.is_empty() {
112 return Err(NetworkError::Configuration(
113 "No CDN endpoints configured".into(),
114 ));
115 }
116
117 let cache = if let Some(ref path) = config.cache_dir {
119 let cache_config = CacheConfig {
120 path: path.clone(),
121 max_size: config.max_cache_size,
122 ..Default::default()
123 };
124 Some(FragmentCache::open(cache_config).await?)
125 } else {
126 None
127 };
128
129 let scheduler = Scheduler::new(SchedulerConfig::from(&config));
130
131 Ok(Self {
132 clients,
133 cache,
134 scheduler,
135 })
136 }
137
138 pub async fn load(&self, request: LoadRequest) -> LoadResult {
140 let start = Instant::now();
141
142 if let Some(ref cache) = self.cache {
144 if let Some(data) = cache.get(&request.fragment_id).await {
145 return LoadResult::Success {
146 fragment_id: request.fragment_id,
147 data,
148 duration: start.elapsed(),
149 from_cache: true,
150 };
151 }
152 }
153
154 for client in &self.clients {
156 match client.fetch(&request.path).await {
157 Ok(data) => {
158 let duration = start.elapsed();
159
160 if let Some(ref cache) = self.cache {
162 let entry = CacheEntry::new(request.fragment_id, data.len() as u64);
163 if let Err(e) = cache.put(request.fragment_id, data.clone(), entry).await {
164 warn!("Failed to cache fragment: {:?}", e);
165 }
166 }
167
168 self.scheduler
170 .record_success(data.len() as u64, duration)
171 .await;
172
173 return LoadResult::Success {
174 fragment_id: request.fragment_id,
175 data,
176 duration,
177 from_cache: false,
178 };
179 }
180 Err(e) => {
181 warn!("Endpoint failed: {:?}", e);
182 continue;
183 }
184 }
185 }
186
187 self.scheduler.record_failure();
188 LoadResult::Failed {
189 fragment_id: request.fragment_id,
190 error: NetworkError::RetriesExhausted("All endpoints failed".into()),
191 }
192 }
193
194 pub async fn load_range(&self, request: LoadRequest, start: u64, end: u64) -> LoadResult {
196 let range = RangeRequest::new(start, end);
197 let start_time = Instant::now();
198
199 for client in &self.clients {
200 match client.fetch_range(&request.path, range.clone()).await {
201 Ok(data) => {
202 let duration = start_time.elapsed();
203 self.scheduler
204 .record_success(data.len() as u64, duration)
205 .await;
206
207 return LoadResult::Success {
208 fragment_id: request.fragment_id,
209 data,
210 duration,
211 from_cache: false,
212 };
213 }
214 Err(e) => {
215 warn!("Range request failed: {:?}", e);
216 continue;
217 }
218 }
219 }
220
221 self.scheduler.record_failure();
222 LoadResult::Failed {
223 fragment_id: request.fragment_id,
224 error: NetworkError::RetriesExhausted("All endpoints failed".into()),
225 }
226 }
227
228 pub fn enqueue(&self, request: LoadRequest) {
230 let prioritized = PrioritizedFragment::new(request.fragment_id, request.priority)
231 .with_importance(request.importance)
232 .with_size(request.expected_size.unwrap_or(0) as usize);
233
234 self.scheduler.enqueue(prioritized);
235 }
236
237 pub fn enqueue_many(&self, requests: impl IntoIterator<Item = LoadRequest>) {
239 for request in requests {
240 self.enqueue(request);
241 }
242 }
243
244 pub fn scheduler(&self) -> &Scheduler {
246 &self.scheduler
247 }
248
249 pub fn cache(&self) -> Option<&FragmentCache> {
251 self.cache.as_ref()
252 }
253
254 pub async fn sync(&self) -> Result<()> {
256 if let Some(ref cache) = self.cache {
257 cache.sync().await?;
258 }
259 Ok(())
260 }
261}
262
263pub struct StreamingLoader {
265 loader: Arc<NetworkLoader>,
266 path_prefix: String,
267 rx: mpsc::Receiver<LoadResult>,
268 tx: mpsc::Sender<LoadResult>,
269}
270
271impl StreamingLoader {
272 pub fn new(loader: Arc<NetworkLoader>, path_prefix: impl Into<String>, buffer: usize) -> Self {
274 let (tx, rx) = mpsc::channel(buffer);
275 Self {
276 loader,
277 path_prefix: path_prefix.into(),
278 rx,
279 tx,
280 }
281 }
282
283 pub async fn start(&mut self, requests: Vec<LoadRequest>) {
285 for request in requests {
286 let loader = self.loader.clone();
287 let tx = self.tx.clone();
288 let path = format!("{}/{}", self.path_prefix, request.path);
289 let request = LoadRequest { path, ..request };
290
291 tokio::spawn(async move {
292 let result = loader.load(request).await;
293 let _ = tx.send(result).await;
294 });
295 }
296 }
297
298 pub async fn next(&mut self) -> Option<LoadResult> {
300 self.rx.recv().await
301 }
302
303 pub async fn next_timeout(&mut self, timeout: Duration) -> Option<LoadResult> {
305 tokio::time::timeout(timeout, self.rx.recv())
306 .await
307 .ok()
308 .flatten()
309 }
310}
311
312#[cfg(test)]
313mod tests {
314 #[allow(unused_imports)]
315 use super::*;
316
317 }