Skip to main content

haagenti_network/
loader.rs

1//! Network loader for fragment streaming
2
3use crate::{
4    CacheConfig, CacheEntry, ClientConfig, FragmentCache, HttpClient, NetworkConfig, NetworkError,
5    PrioritizedFragment, Priority, RangeRequest, Result, Scheduler, SchedulerConfig,
6};
7use bytes::Bytes;
8use haagenti_fragments::FragmentId;
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use tokio::sync::mpsc;
12use tracing::warn;
13
14/// Request to load a fragment
15#[derive(Debug, Clone)]
16pub struct LoadRequest {
17    /// Fragment ID
18    pub fragment_id: FragmentId,
19    /// CDN path
20    pub path: String,
21    /// Priority
22    pub priority: Priority,
23    /// Expected size (for progress tracking)
24    pub expected_size: Option<u64>,
25    /// Importance score (from ML model)
26    pub importance: f32,
27}
28
29impl LoadRequest {
30    /// Create a new load request
31    pub fn new(fragment_id: FragmentId, path: impl Into<String>) -> Self {
32        Self {
33            fragment_id,
34            path: path.into(),
35            priority: Priority::Normal,
36            expected_size: None,
37            importance: 0.5,
38        }
39    }
40
41    /// Set priority
42    pub fn with_priority(mut self, priority: Priority) -> Self {
43        self.priority = priority;
44        self
45    }
46
47    /// Set expected size
48    pub fn with_expected_size(mut self, size: u64) -> Self {
49        self.expected_size = Some(size);
50        self
51    }
52
53    /// Set importance
54    pub fn with_importance(mut self, importance: f32) -> Self {
55        self.importance = importance;
56        self
57    }
58}
59
60/// Result of loading a fragment
61#[derive(Debug)]
62pub enum LoadResult {
63    /// Successfully loaded
64    Success {
65        fragment_id: FragmentId,
66        data: Bytes,
67        duration: Duration,
68        from_cache: bool,
69    },
70    /// Failed to load
71    Failed {
72        fragment_id: FragmentId,
73        error: NetworkError,
74    },
75}
76
77impl LoadResult {
78    /// Check if successful
79    pub fn is_success(&self) -> bool {
80        matches!(self, LoadResult::Success { .. })
81    }
82
83    /// Get fragment ID
84    pub fn fragment_id(&self) -> FragmentId {
85        match self {
86            LoadResult::Success { fragment_id, .. } => *fragment_id,
87            LoadResult::Failed { fragment_id, .. } => *fragment_id,
88        }
89    }
90}
91
92/// Network loader for fragment streaming
93pub struct NetworkLoader {
94    clients: Vec<HttpClient>,
95    cache: Option<FragmentCache>,
96    scheduler: Scheduler,
97}
98
99impl NetworkLoader {
100    /// Create a new network loader
101    pub async fn new(config: NetworkConfig) -> Result<Self> {
102        // Create HTTP clients for each endpoint
103        let client_config = ClientConfig::from(&config);
104        let mut clients = Vec::new();
105
106        for endpoint in &config.endpoints {
107            let client = HttpClient::new(endpoint.clone(), client_config.clone())?;
108            clients.push(client);
109        }
110
111        if clients.is_empty() {
112            return Err(NetworkError::Configuration(
113                "No CDN endpoints configured".into(),
114            ));
115        }
116
117        // Create cache if configured
118        let cache = if let Some(ref path) = config.cache_dir {
119            let cache_config = CacheConfig {
120                path: path.clone(),
121                max_size: config.max_cache_size,
122                ..Default::default()
123            };
124            Some(FragmentCache::open(cache_config).await?)
125        } else {
126            None
127        };
128
129        let scheduler = Scheduler::new(SchedulerConfig::from(&config));
130
131        Ok(Self {
132            clients,
133            cache,
134            scheduler,
135        })
136    }
137
138    /// Load a single fragment
139    pub async fn load(&self, request: LoadRequest) -> LoadResult {
140        let start = Instant::now();
141
142        // Check cache first
143        if let Some(ref cache) = self.cache {
144            if let Some(data) = cache.get(&request.fragment_id).await {
145                return LoadResult::Success {
146                    fragment_id: request.fragment_id,
147                    data,
148                    duration: start.elapsed(),
149                    from_cache: true,
150                };
151            }
152        }
153
154        // Try each endpoint
155        for client in &self.clients {
156            match client.fetch(&request.path).await {
157                Ok(data) => {
158                    let duration = start.elapsed();
159
160                    // Cache the result
161                    if let Some(ref cache) = self.cache {
162                        let entry = CacheEntry::new(request.fragment_id, data.len() as u64);
163                        if let Err(e) = cache.put(request.fragment_id, data.clone(), entry).await {
164                            warn!("Failed to cache fragment: {:?}", e);
165                        }
166                    }
167
168                    // Record bandwidth
169                    self.scheduler
170                        .record_success(data.len() as u64, duration)
171                        .await;
172
173                    return LoadResult::Success {
174                        fragment_id: request.fragment_id,
175                        data,
176                        duration,
177                        from_cache: false,
178                    };
179                }
180                Err(e) => {
181                    warn!("Endpoint failed: {:?}", e);
182                    continue;
183                }
184            }
185        }
186
187        self.scheduler.record_failure();
188        LoadResult::Failed {
189            fragment_id: request.fragment_id,
190            error: NetworkError::RetriesExhausted("All endpoints failed".into()),
191        }
192    }
193
194    /// Load a range of bytes (for progressive loading)
195    pub async fn load_range(&self, request: LoadRequest, start: u64, end: u64) -> LoadResult {
196        let range = RangeRequest::new(start, end);
197        let start_time = Instant::now();
198
199        for client in &self.clients {
200            match client.fetch_range(&request.path, range.clone()).await {
201                Ok(data) => {
202                    let duration = start_time.elapsed();
203                    self.scheduler
204                        .record_success(data.len() as u64, duration)
205                        .await;
206
207                    return LoadResult::Success {
208                        fragment_id: request.fragment_id,
209                        data,
210                        duration,
211                        from_cache: false,
212                    };
213                }
214                Err(e) => {
215                    warn!("Range request failed: {:?}", e);
216                    continue;
217                }
218            }
219        }
220
221        self.scheduler.record_failure();
222        LoadResult::Failed {
223            fragment_id: request.fragment_id,
224            error: NetworkError::RetriesExhausted("All endpoints failed".into()),
225        }
226    }
227
228    /// Enqueue requests for background loading
229    pub fn enqueue(&self, request: LoadRequest) {
230        let prioritized = PrioritizedFragment::new(request.fragment_id, request.priority)
231            .with_importance(request.importance)
232            .with_size(request.expected_size.unwrap_or(0) as usize);
233
234        self.scheduler.enqueue(prioritized);
235    }
236
237    /// Enqueue multiple requests
238    pub fn enqueue_many(&self, requests: impl IntoIterator<Item = LoadRequest>) {
239        for request in requests {
240            self.enqueue(request);
241        }
242    }
243
244    /// Get scheduler for advanced control
245    pub fn scheduler(&self) -> &Scheduler {
246        &self.scheduler
247    }
248
249    /// Get cache for direct access
250    pub fn cache(&self) -> Option<&FragmentCache> {
251        self.cache.as_ref()
252    }
253
254    /// Sync cache to disk
255    pub async fn sync(&self) -> Result<()> {
256        if let Some(ref cache) = self.cache {
257            cache.sync().await?;
258        }
259        Ok(())
260    }
261}
262
263/// Streaming loader for continuous fragment loading
264pub struct StreamingLoader {
265    loader: Arc<NetworkLoader>,
266    path_prefix: String,
267    rx: mpsc::Receiver<LoadResult>,
268    tx: mpsc::Sender<LoadResult>,
269}
270
271impl StreamingLoader {
272    /// Create a new streaming loader
273    pub fn new(loader: Arc<NetworkLoader>, path_prefix: impl Into<String>, buffer: usize) -> Self {
274        let (tx, rx) = mpsc::channel(buffer);
275        Self {
276            loader,
277            path_prefix: path_prefix.into(),
278            rx,
279            tx,
280        }
281    }
282
283    /// Start loading fragments
284    pub async fn start(&mut self, requests: Vec<LoadRequest>) {
285        for request in requests {
286            let loader = self.loader.clone();
287            let tx = self.tx.clone();
288            let path = format!("{}/{}", self.path_prefix, request.path);
289            let request = LoadRequest { path, ..request };
290
291            tokio::spawn(async move {
292                let result = loader.load(request).await;
293                let _ = tx.send(result).await;
294            });
295        }
296    }
297
298    /// Receive next result
299    pub async fn next(&mut self) -> Option<LoadResult> {
300        self.rx.recv().await
301    }
302
303    /// Receive with timeout
304    pub async fn next_timeout(&mut self, timeout: Duration) -> Option<LoadResult> {
305        tokio::time::timeout(timeout, self.rx.recv())
306            .await
307            .ok()
308            .flatten()
309    }
310}
311
312#[cfg(test)]
313mod tests {
314    #[allow(unused_imports)]
315    use super::*;
316
317    // Integration tests would use wiremock here
318}