1use crate::error::{CascError, Result};
4use crate::types::EKey;
5use async_trait::async_trait;
6use std::collections::VecDeque;
7use std::sync::{Arc, Weak};
8use std::time::{Duration, Instant};
9use tokio::sync::{RwLock, mpsc, oneshot};
10use tracing::{debug, info, trace, warn};
11
12const DEFAULT_CHUNK_SIZE: usize = 256 * 1024;
14
15const DEFAULT_MAX_PREFETCH_CHUNKS: usize = 4;
17
18const DEFAULT_CHUNK_TIMEOUT: Duration = Duration::from_secs(30);
20
21#[derive(Debug, Clone)]
23pub struct ProgressiveConfig {
24 pub chunk_size: usize,
26 pub max_prefetch_chunks: usize,
28 pub chunk_timeout: Duration,
30 pub use_predictive_prefetch: bool,
32 pub min_progressive_size: usize,
34}
35
36impl Default for ProgressiveConfig {
37 fn default() -> Self {
38 Self {
39 chunk_size: DEFAULT_CHUNK_SIZE,
40 max_prefetch_chunks: DEFAULT_MAX_PREFETCH_CHUNKS,
41 chunk_timeout: DEFAULT_CHUNK_TIMEOUT,
42 use_predictive_prefetch: true,
43 min_progressive_size: 1024 * 1024, }
45 }
46}
47
48#[derive(Debug, Clone, Copy)]
50pub enum SizeHint {
51 Exact(u64),
53 Estimated { size: u64, confidence: f32 },
55 Minimum(u64),
57 Unknown,
59}
60
61impl SizeHint {
62 pub fn suggested_initial_size(&self) -> Option<usize> {
64 match self {
65 SizeHint::Exact(size) => Some(*size as usize),
66 SizeHint::Estimated { size, confidence } if *confidence > 0.7 => Some(*size as usize),
67 SizeHint::Minimum(size) => Some(*size as usize),
68 _ => None,
69 }
70 }
71
72 pub fn should_use_progressive(&self, config: &ProgressiveConfig) -> bool {
74 match self {
75 SizeHint::Exact(size) | SizeHint::Minimum(size) => {
76 *size as usize > config.min_progressive_size
77 }
78 SizeHint::Estimated { size, confidence } => {
79 *size as usize > config.min_progressive_size && *confidence > 0.5
80 }
81 SizeHint::Unknown => false,
82 }
83 }
84}
85
86#[derive(Debug, Default)]
88struct AccessPattern {
89 sequential_reads: VecDeque<usize>,
91 last_access: Option<Instant>,
93 avg_interval: Option<Duration>,
95}
96
97impl AccessPattern {
98 fn record_access(&mut self, chunk_index: usize) {
100 let now = Instant::now();
101
102 if let Some(last) = self.last_access {
104 let interval = now.duration_since(last);
105 self.avg_interval = Some(match self.avg_interval {
106 Some(avg) => Duration::from_nanos(
107 ((avg.as_nanos() + interval.as_nanos()) / 2).min(u64::MAX as u128) as u64,
108 ),
109 None => interval,
110 });
111 }
112
113 self.last_access = Some(now);
114 self.sequential_reads.push_back(chunk_index);
115
116 while self.sequential_reads.len() > 10 {
118 self.sequential_reads.pop_front();
119 }
120 }
121
122 fn predict_next_chunks(&self, current_chunk: usize, max_predictions: usize) -> Vec<usize> {
124 if self.sequential_reads.len() < 2 {
125 return (1..=max_predictions).map(|i| current_chunk + i).collect();
127 }
128
129 let is_sequential = self
131 .sequential_reads
132 .iter()
133 .collect::<Vec<_>>()
134 .windows(2)
135 .all(|w| w[1] == &(w[0] + 1));
136
137 if is_sequential {
138 (1..=max_predictions).map(|i| current_chunk + i).collect()
140 } else {
141 (1..=max_predictions).map(|i| current_chunk + i).collect()
144 }
145 }
146}
147
148#[derive(Debug, Clone)]
150pub struct ProgressiveChunk {
151 pub index: usize,
153 pub data: Vec<u8>,
155 pub size: usize,
157 pub is_final: bool,
159}
160
161#[derive(Debug)]
163pub struct ProgressiveFile {
164 #[allow(dead_code)]
166 ekey: EKey,
167 size_hint: SizeHint,
169 config: ProgressiveConfig,
171 chunks: Arc<RwLock<std::collections::HashMap<usize, ProgressiveChunk>>>,
173 position: Arc<RwLock<u64>>,
175 access_pattern: Arc<RwLock<AccessPattern>>,
177 chunk_request_tx: mpsc::UnboundedSender<ChunkRequest>,
179 stats: Arc<RwLock<LoadingStats>>,
181}
182
183#[derive(Debug, Default, Clone)]
185pub struct LoadingStats {
186 pub chunks_loaded: usize,
188 pub bytes_loaded: u64,
190 pub cache_hits: usize,
192 pub cache_misses: usize,
194 pub total_load_time: Duration,
196 pub avg_chunk_load_time: Duration,
198 pub prefetch_hits: usize,
200 pub prefetch_misses: usize,
202}
203
204#[derive(Debug)]
206struct ChunkRequest {
207 chunk_index: usize,
209 priority: ChunkPriority,
211 response_tx: oneshot::Sender<Result<()>>,
213}
214
215#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
217enum ChunkPriority {
218 Prefetch = 1,
220 #[allow(dead_code)]
222 Normal = 2,
223 Urgent = 3,
225}
226
227impl ProgressiveFile {
228 pub fn new(
230 ekey: EKey,
231 size_hint: SizeHint,
232 config: ProgressiveConfig,
233 loader: Weak<dyn ChunkLoader + Send + Sync>,
234 ) -> Self {
235 let (chunk_request_tx, chunk_request_rx) = mpsc::unbounded_channel();
236 let chunks = Arc::new(RwLock::new(std::collections::HashMap::new()));
237 let stats = Arc::new(RwLock::new(LoadingStats::default()));
238
239 let loader_chunks = Arc::clone(&chunks);
241 let loader_stats = Arc::clone(&stats);
242 let loader_config = config.clone();
243
244 tokio::spawn(async move {
245 Self::chunk_loader_task(
246 ekey,
247 loader,
248 chunk_request_rx,
249 loader_chunks,
250 loader_stats,
251 loader_config,
252 )
253 .await;
254 });
255
256 Self {
257 ekey,
258 size_hint,
259 config,
260 chunks,
261 position: Arc::new(RwLock::new(0)),
262 access_pattern: Arc::new(RwLock::new(AccessPattern::default())),
263 chunk_request_tx,
264 stats,
265 }
266 }
267
268 pub async fn read(&self, offset: u64, length: usize) -> Result<Vec<u8>> {
270 let start_time = Instant::now();
271 trace!("Progressive read: offset={}, length={}", offset, length);
272
273 {
275 let mut pos = self.position.write().await;
276 *pos = offset;
277 }
278
279 let chunk_size = self.config.chunk_size as u64;
280 let start_chunk = (offset / chunk_size) as usize;
281 let end_chunk = ((offset + length as u64 - 1) / chunk_size) as usize;
282
283 let mut result = Vec::with_capacity(length);
284
285 for chunk_index in start_chunk..=end_chunk {
286 {
288 let mut pattern = self.access_pattern.write().await;
289 pattern.record_access(chunk_index);
290 }
291
292 let chunk_data = {
294 let chunks = self.chunks.read().await;
295 chunks.get(&chunk_index).map(|chunk| chunk.data.clone())
296 };
297
298 let chunk_data = if let Some(data) = chunk_data {
299 {
301 let mut stats = self.stats.write().await;
302 stats.cache_hits += 1;
303 }
304 trace!("Cache hit for chunk {}", chunk_index);
305 data
306 } else {
307 {
309 let mut stats = self.stats.write().await;
310 stats.cache_misses += 1;
311 }
312
313 trace!("Cache miss for chunk {}, loading...", chunk_index);
314 self.load_chunk(chunk_index, ChunkPriority::Urgent).await?;
315
316 let chunks = self.chunks.read().await;
317 chunks
318 .get(&chunk_index)
319 .ok_or_else(|| {
320 CascError::InvalidArchiveFormat("Chunk failed to load".to_string())
321 })?
322 .data
323 .clone()
324 };
325
326 let chunk_start_offset = chunk_index as u64 * chunk_size;
328 let chunk_end_offset = chunk_start_offset + chunk_data.len() as u64;
329
330 let read_start = offset.max(chunk_start_offset);
331 let read_end = (offset + length as u64).min(chunk_end_offset);
332
333 if read_start < read_end {
334 let chunk_read_start = (read_start - chunk_start_offset) as usize;
335 let chunk_read_end = (read_end - chunk_start_offset) as usize;
336
337 result.extend_from_slice(&chunk_data[chunk_read_start..chunk_read_end]);
338 }
339 }
340
341 {
343 let mut stats = self.stats.write().await;
344 stats.total_load_time += start_time.elapsed();
345 }
346
347 if self.config.use_predictive_prefetch {
349 self.trigger_predictive_prefetch(end_chunk).await;
350 }
351
352 debug!(
353 "Progressive read completed: offset={}, length={}, chunks={}..={}",
354 offset, length, start_chunk, end_chunk
355 );
356
357 Ok(result)
358 }
359
360 async fn load_chunk(&self, chunk_index: usize, priority: ChunkPriority) -> Result<()> {
362 let (response_tx, response_rx) = oneshot::channel();
363
364 let request = ChunkRequest {
365 chunk_index,
366 priority,
367 response_tx,
368 };
369
370 self.chunk_request_tx
371 .send(request)
372 .map_err(|_| CascError::InvalidArchiveFormat("Chunk loader unavailable".to_string()))?;
373
374 response_rx
375 .await
376 .map_err(|_| CascError::InvalidArchiveFormat("Chunk load failed".to_string()))?
377 }
378
379 async fn trigger_predictive_prefetch(&self, last_accessed_chunk: usize) {
381 let predictions = {
382 let pattern = self.access_pattern.read().await;
383 pattern.predict_next_chunks(last_accessed_chunk, self.config.max_prefetch_chunks)
384 };
385
386 trace!("Predictive prefetch suggestions: {:?}", predictions);
387
388 for chunk_index in predictions {
389 let already_loaded = {
391 let chunks = self.chunks.read().await;
392 chunks.contains_key(&chunk_index)
393 };
394
395 if !already_loaded {
396 let _ = self.load_chunk(chunk_index, ChunkPriority::Prefetch).await;
397 }
398 }
399 }
400
401 pub async fn get_stats(&self) -> LoadingStats {
403 self.stats.read().await.clone()
404 }
405
406 pub fn get_size_hint(&self) -> SizeHint {
408 self.size_hint
409 }
410
411 pub async fn is_fully_loaded(&self) -> bool {
413 if let SizeHint::Exact(size) = self.size_hint {
414 let chunks = self.chunks.read().await;
415 let chunk_size = self.config.chunk_size as u64;
416 let expected_chunks = size.div_ceil(chunk_size) as usize;
417
418 chunks.len() == expected_chunks && chunks.values().any(|chunk| chunk.is_final)
419 } else {
420 false
421 }
422 }
423
424 async fn chunk_loader_task(
426 ekey: EKey,
427 loader: Weak<dyn ChunkLoader + Send + Sync>,
428 mut request_rx: mpsc::UnboundedReceiver<ChunkRequest>,
429 chunks: Arc<RwLock<std::collections::HashMap<usize, ProgressiveChunk>>>,
430 stats: Arc<RwLock<LoadingStats>>,
431 config: ProgressiveConfig,
432 ) {
433 debug!("Started chunk loader task for {}", ekey);
434
435 let mut pending_requests: Vec<ChunkRequest> = Vec::new();
437
438 while let Some(request) = request_rx.recv().await {
439 pending_requests.push(request);
440
441 pending_requests.sort_by(|a, b| b.priority.cmp(&a.priority));
443
444 if let Some(request) = pending_requests.pop() {
446 let load_result = if let Some(loader_arc) = loader.upgrade() {
447 Self::load_single_chunk(
448 loader_arc,
449 ekey,
450 request.chunk_index,
451 &chunks,
452 &stats,
453 &config,
454 )
455 .await
456 } else {
457 warn!("Chunk loader has been dropped, stopping chunk loading");
458 break;
459 };
460
461 let _ = request.response_tx.send(load_result);
462 }
463 }
464
465 debug!("Chunk loader task completed for {}", ekey);
466 }
467
468 async fn load_single_chunk(
470 loader: Arc<dyn ChunkLoader + Send + Sync>,
471 ekey: EKey,
472 chunk_index: usize,
473 chunks: &Arc<RwLock<std::collections::HashMap<usize, ProgressiveChunk>>>,
474 stats: &Arc<RwLock<LoadingStats>>,
475 config: &ProgressiveConfig,
476 ) -> Result<()> {
477 let start_time = Instant::now();
478 trace!("Loading chunk {} for {}", chunk_index, ekey);
479
480 {
482 let chunks_guard = chunks.read().await;
483 if chunks_guard.contains_key(&chunk_index) {
484 trace!("Chunk {} already loaded", chunk_index);
485 return Ok(());
486 }
487 }
488
489 let chunk_offset = chunk_index as u64 * config.chunk_size as u64;
490
491 match loader
492 .load_chunk(ekey, chunk_offset, config.chunk_size)
493 .await
494 {
495 Ok(chunk_data) => {
496 let is_final = chunk_data.len() < config.chunk_size;
497 let chunk_size = chunk_data.len();
498 let chunk = ProgressiveChunk {
499 index: chunk_index,
500 size: chunk_size,
501 is_final,
502 data: chunk_data,
503 };
504
505 {
507 let mut chunks_guard = chunks.write().await;
508 chunks_guard.insert(chunk_index, chunk.clone());
509 }
510
511 {
513 let mut stats_guard = stats.write().await;
514 stats_guard.chunks_loaded += 1;
515 stats_guard.bytes_loaded += chunk_size as u64;
516 let load_time = start_time.elapsed();
517 stats_guard.total_load_time += load_time;
518 stats_guard.avg_chunk_load_time =
519 stats_guard.total_load_time / stats_guard.chunks_loaded as u32;
520 }
521
522 trace!(
523 "Loaded chunk {} ({} bytes) for {} in {:?}",
524 chunk_index,
525 chunk.size,
526 ekey,
527 start_time.elapsed()
528 );
529
530 Ok(())
531 }
532 Err(e) => {
533 warn!("Failed to load chunk {} for {}: {}", chunk_index, ekey, e);
534 Err(e)
535 }
536 }
537 }
538}
539
540#[async_trait]
542pub trait ChunkLoader {
543 async fn load_chunk(&self, ekey: EKey, offset: u64, size: usize) -> Result<Vec<u8>>;
545}
546
547pub struct ProgressiveFileManager {
549 config: ProgressiveConfig,
551 active_files: Arc<RwLock<std::collections::HashMap<EKey, Arc<ProgressiveFile>>>>,
553 chunk_loader: Arc<dyn ChunkLoader + Send + Sync>,
555}
556
557impl ProgressiveFileManager {
558 pub fn new(
560 config: ProgressiveConfig,
561 chunk_loader: Arc<dyn ChunkLoader + Send + Sync>,
562 ) -> Self {
563 Self {
564 config,
565 active_files: Arc::new(RwLock::new(std::collections::HashMap::new())),
566 chunk_loader,
567 }
568 }
569
570 pub async fn get_or_create_progressive_file(
572 &self,
573 ekey: EKey,
574 size_hint: SizeHint,
575 ) -> Arc<ProgressiveFile> {
576 {
578 let active_files = self.active_files.read().await;
579 if let Some(file) = active_files.get(&ekey) {
580 return Arc::clone(file);
581 }
582 }
583
584 let progressive_file = Arc::new(ProgressiveFile::new(
586 ekey,
587 size_hint,
588 self.config.clone(),
589 Arc::downgrade(&self.chunk_loader),
590 ));
591
592 {
594 let mut active_files = self.active_files.write().await;
595 active_files.insert(ekey, Arc::clone(&progressive_file));
596 }
597
598 info!(
599 "Created progressive file for {} with hint {:?}",
600 ekey, size_hint
601 );
602 progressive_file
603 }
604
605 pub async fn remove_progressive_file(&self, ekey: &EKey) {
607 let mut active_files = self.active_files.write().await;
608 active_files.remove(ekey);
609 }
610
611 pub async fn get_global_stats(&self) -> Vec<(EKey, LoadingStats)> {
613 let active_files = self.active_files.read().await;
614 let mut stats = Vec::new();
615
616 for (ekey, file) in active_files.iter() {
617 let file_stats = file.get_stats().await;
618 stats.push((*ekey, file_stats));
619 }
620
621 stats
622 }
623
624 pub async fn cleanup_inactive_files(&self, max_idle_time: Duration) {
626 let now = Instant::now();
627 let mut to_remove = Vec::new();
628
629 {
630 let active_files = self.active_files.read().await;
631 for (ekey, file) in active_files.iter() {
632 let pattern = file.access_pattern.read().await;
633 if let Some(last_access) = pattern.last_access {
634 if now.duration_since(last_access) > max_idle_time {
635 to_remove.push(*ekey);
636 }
637 }
638 }
639 }
640
641 if !to_remove.is_empty() {
642 let mut active_files = self.active_files.write().await;
643 for ekey in to_remove {
644 active_files.remove(&ekey);
645 trace!("Cleaned up inactive progressive file: {}", ekey);
646 }
647 }
648 }
649}
650
651#[cfg(test)]
652mod tests {
653 use super::*;
654 use crate::types::EKey;
655 use std::sync::atomic::{AtomicUsize, Ordering};
656
657 struct MockChunkLoader {
659 total_size: usize,
660 call_count: Arc<AtomicUsize>,
661 }
662
663 impl MockChunkLoader {
664 fn new(total_size: usize) -> Self {
665 Self {
666 total_size,
667 call_count: Arc::new(AtomicUsize::new(0)),
668 }
669 }
670 }
671
672 #[async_trait]
673 impl ChunkLoader for MockChunkLoader {
674 async fn load_chunk(&self, _ekey: EKey, offset: u64, size: usize) -> Result<Vec<u8>> {
675 self.call_count.fetch_add(1, Ordering::SeqCst);
676
677 let start = offset as usize;
678 let end = (start + size).min(self.total_size);
679
680 if start >= self.total_size {
681 return Ok(Vec::new());
682 }
683
684 let data: Vec<u8> = (start..end).map(|i| (i % 256) as u8).collect();
686
687 tokio::time::sleep(Duration::from_millis(10)).await;
689
690 Ok(data)
691 }
692 }
693
694 #[tokio::test]
695 async fn test_progressive_file_creation() {
696 let ekey = EKey::new([1; 16]);
697 let size_hint = SizeHint::Exact(1024);
698 let config = ProgressiveConfig::default();
699 let loader = Arc::new(MockChunkLoader::new(1024));
700
701 let manager = ProgressiveFileManager::new(config, loader);
702 let file = manager
703 .get_or_create_progressive_file(ekey, size_hint)
704 .await;
705
706 assert_eq!(file.get_size_hint().suggested_initial_size(), Some(1024));
707 }
708
709 #[tokio::test]
710 async fn test_progressive_reading() {
711 let ekey = EKey::new([2; 16]);
712 let total_size = 2048;
713 let size_hint = SizeHint::Exact(total_size);
714 let config = ProgressiveConfig {
715 chunk_size: 512,
716 ..ProgressiveConfig::default()
717 };
718 let loader = Arc::new(MockChunkLoader::new(total_size as usize));
719
720 let manager = ProgressiveFileManager::new(config, loader);
721 let file = manager
722 .get_or_create_progressive_file(ekey, size_hint)
723 .await;
724
725 let data1 = file.read(0, 256).await.unwrap();
727 assert_eq!(data1.len(), 256);
728 assert_eq!(data1[0], 0);
729 assert_eq!(data1[255], 255);
730
731 let data2 = file.read(400, 300).await.unwrap();
733 assert_eq!(data2.len(), 300);
734
735 let stats = file.get_stats().await;
736 assert!(stats.chunks_loaded > 0);
737 assert!(stats.bytes_loaded > 0);
738 }
739
740 #[tokio::test]
741 async fn test_size_hint_logic() {
742 let config = ProgressiveConfig::default();
743
744 assert!(SizeHint::Exact(2_000_000).should_use_progressive(&config));
745 assert!(!SizeHint::Exact(500_000).should_use_progressive(&config));
746
747 assert!(
748 SizeHint::Estimated {
749 size: 2_000_000,
750 confidence: 0.8
751 }
752 .should_use_progressive(&config)
753 );
754
755 assert!(
756 !SizeHint::Estimated {
757 size: 2_000_000,
758 confidence: 0.3
759 }
760 .should_use_progressive(&config)
761 );
762 }
763
764 #[tokio::test]
765 async fn test_cache_efficiency() {
766 let ekey = EKey::new([3; 16]);
767 let total_size = 1024;
768 let size_hint = SizeHint::Exact(total_size);
769 let config = ProgressiveConfig {
770 chunk_size: 256,
771 ..ProgressiveConfig::default()
772 };
773 let loader = Arc::new(MockChunkLoader::new(total_size as usize));
774
775 let manager = ProgressiveFileManager::new(config, loader.clone());
776 let file = manager
777 .get_or_create_progressive_file(ekey, size_hint)
778 .await;
779
780 let _data1 = file.read(100, 100).await.unwrap();
782 let initial_calls = loader.call_count.load(Ordering::SeqCst);
783
784 let _data2 = file.read(150, 50).await.unwrap();
786 let final_calls = loader.call_count.load(Ordering::SeqCst);
787
788 assert_eq!(initial_calls, final_calls);
790
791 let stats = file.get_stats().await;
792 assert!(stats.cache_hits > 0);
793 }
794}