1use std::io::{self, Read, Seek, Write, BufWriter};
2use std::collections::HashMap;
3use std::path::PathBuf;
4use zip::ZipArchive;
5use memmap2::{Mmap, MmapOptions};
6use tempfile::NamedTempFile;
7
8#[derive(Debug, Clone)]
10pub struct MmapConfig {
11 pub threshold: u64,
13 pub max_maps: usize,
15 pub temp_dir: Option<PathBuf>,
17 pub huge_file_threshold: u64,
19 pub stream_chunk_size: usize,
21 pub enable_streaming: bool,
23}
24
25impl Default for MmapConfig {
26 fn default() -> Self {
27 Self {
28 threshold: 1024 * 1024, max_maps: 8,
30 temp_dir: None,
31 huge_file_threshold: 100 * 1024 * 1024, stream_chunk_size: 8 * 1024 * 1024, enable_streaming: true,
34 }
35 }
36}
37
38struct MmapEntry {
40 _temp_file: NamedTempFile,
41 mmap: Mmap,
42}
43
44#[derive(Debug, Clone, PartialEq)]
46pub enum FileSizeCategory {
47 Small,
49 Large,
51 Huge,
53}
54
55#[derive(Debug, Clone, PartialEq)]
57pub enum ProcessingStrategy {
58 Standard,
60 MemoryMap,
62 Streaming,
64}
65
66#[derive(Debug, Clone)]
68pub struct FileInfo {
69 pub name: String,
71 pub size: u64,
73 pub compressed_size: u64,
75 pub compression_ratio: f64,
77 pub category: FileSizeCategory,
79 pub recommended_strategy: ProcessingStrategy,
81}
82
83pub struct ZipDocumentReader<R: Read + Seek> {
85 pub(crate) zip: ZipArchive<R>,
86 mmap_config: MmapConfig,
87 mmap_cache: HashMap<String, MmapEntry>,
88 access_count: HashMap<String, u64>,
89}
90
91impl<R: Read + Seek> ZipDocumentReader<R> {
92 pub fn new(r: R) -> io::Result<Self> {
94 Self::with_mmap_config(r, MmapConfig::default())
95 }
96
97 pub fn with_mmap_config(
99 r: R,
100 config: MmapConfig,
101 ) -> io::Result<Self> {
102 Ok(Self {
103 zip: ZipArchive::new(r)?,
104 mmap_config: config,
105 mmap_cache: HashMap::new(),
106 access_count: HashMap::new(),
107 })
108 }
109 pub fn read_all(
111 &mut self,
112 name: &str,
113 ) -> io::Result<Vec<u8>> {
114 self.read_smart(name)
116 }
117
118 pub fn read_smart(
120 &mut self,
121 name: &str,
122 ) -> io::Result<Vec<u8>> {
123 let file_info = self.get_file_info(name)?;
124
125 match file_info.recommended_strategy {
126 ProcessingStrategy::Standard => {
127 *self.access_count.entry(name.to_string()).or_insert(0) += 1;
129 self.read_standard(name)
130 },
131 ProcessingStrategy::MemoryMap => {
132 match self.read_mmap(name) {
134 Ok(data) => Ok(data.to_vec()),
135 Err(_) => {
136 *self
138 .access_count
139 .entry(name.to_string())
140 .or_insert(0) += 1;
141 self.read_standard(name)
142 },
143 }
144 },
145 ProcessingStrategy::Streaming => {
146 if self.mmap_config.enable_streaming {
148 *self.access_count.entry(name.to_string()).or_insert(0) +=
149 1;
150 self.read_huge_file_streaming(name)
151 } else {
152 match self.read_mmap(name) {
154 Ok(data) => Ok(data.to_vec()),
155 Err(_) => {
156 *self
157 .access_count
158 .entry(name.to_string())
159 .or_insert(0) += 1;
160 self.read_standard(name)
161 },
162 }
163 }
164 },
165 }
166 }
167
168 pub fn read_mmap(
170 &mut self,
171 name: &str,
172 ) -> io::Result<&[u8]> {
173 if self.mmap_cache.contains_key(name) {
175 *self.access_count.entry(name.to_string()).or_insert(0) += 1;
177 return Ok(&self.mmap_cache[name].mmap[..]);
178 }
179
180 if self.mmap_cache.len() >= self.mmap_config.max_maps {
182 self.evict_least_used();
183 }
184
185 self.create_mmap_entry(name)?;
187
188 self.access_count.insert(name.to_string(), 1);
190
191 Ok(&self.mmap_cache[name].mmap[..])
192 }
193
194 pub fn read_standard(
196 &mut self,
197 name: &str,
198 ) -> io::Result<Vec<u8>> {
199 let mut f = self.zip.by_name(name)?;
200 let mut buf = Vec::with_capacity(f.size() as usize);
201 std::io::copy(&mut f, &mut buf)?;
202 Ok(buf)
203 }
204
205 pub fn read_plugin_state(
207 &mut self,
208 plugin_name: &str,
209 ) -> io::Result<Option<Vec<u8>>> {
210 let plugin_file_path = format!("plugins/{plugin_name}");
211 match self.read_all(&plugin_file_path) {
212 Ok(data) => Ok(Some(data)),
213 Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(None),
214 Err(e) => Err(e),
215 }
216 }
217
218 pub fn read_all_plugin_states(
220 &mut self
221 ) -> io::Result<std::collections::HashMap<String, Vec<u8>>> {
222 let mut plugin_states = std::collections::HashMap::new();
223
224 let mut plugin_files = Vec::new();
226 for i in 0..self.zip.len() {
227 let file = self.zip.by_index(i)?;
228 let file_name = file.name().to_string();
229
230 if file_name.starts_with("plugins/") && !file_name.ends_with('/') {
231 let plugin_name =
232 file_name.strip_prefix("plugins/").unwrap().to_string();
233 plugin_files.push((plugin_name, file_name));
234 }
235 }
236
237 for (plugin_name, file_name) in plugin_files {
239 let data = self.read_all(&file_name)?;
240 plugin_states.insert(plugin_name, data);
241 }
242
243 Ok(plugin_states)
244 }
245
246 pub fn list_plugins(&mut self) -> io::Result<Vec<String>> {
248 let mut plugins = Vec::new();
249
250 for i in 0..self.zip.len() {
251 let file = self.zip.by_index(i)?;
252 let file_name = file.name();
253
254 if file_name.starts_with("plugins/") && !file_name.ends_with('/') {
255 let plugin_name =
256 file_name.strip_prefix("plugins/").unwrap().to_string();
257 plugins.push(plugin_name);
258 }
259 }
260
261 Ok(plugins)
262 }
263
264 pub fn has_plugin_state(
266 &mut self,
267 plugin_name: &str,
268 ) -> bool {
269 let plugin_file_path = format!("plugins/{plugin_name}");
270 self.zip.by_name(&plugin_file_path).is_ok()
271 }
272
273 fn create_mmap_entry(
275 &mut self,
276 name: &str,
277 ) -> io::Result<()> {
278 let mut temp_file =
280 if let Some(ref temp_dir) = self.mmap_config.temp_dir {
281 NamedTempFile::new_in(temp_dir)?
282 } else {
283 NamedTempFile::new()?
284 };
285
286 {
288 let mut zip_file = self.zip.by_name(name)?;
289 let mut writer = BufWriter::new(&mut temp_file);
290 std::io::copy(&mut zip_file, &mut writer)?;
291 writer.flush()?;
292 }
293
294 temp_file.as_file().sync_all()?;
296
297 let mmap = unsafe { MmapOptions::new().map(temp_file.as_file())? };
299
300 self.mmap_cache.insert(
302 name.to_string(),
303 MmapEntry { _temp_file: temp_file, mmap },
304 );
305
306 Ok(())
307 }
308
309 fn evict_least_used(&mut self) {
311 if let Some((lru_name, _)) = self
312 .access_count
313 .iter()
314 .min_by_key(|(_, count)| **count)
315 .map(|(name, count)| (name.clone(), *count))
316 {
317 self.mmap_cache.remove(&lru_name);
318 self.access_count.remove(&lru_name);
319 }
320 }
321
322 pub fn mmap_config(&self) -> &MmapConfig {
324 &self.mmap_config
325 }
326
327 pub fn mmap_stats(&self) -> MmapStats {
329 let total_size: u64 =
330 self.mmap_cache.values().map(|entry| entry.mmap.len() as u64).sum();
331
332 MmapStats {
333 cached_entries: self.mmap_cache.len(),
334 total_cached_size: total_size,
335 max_entries: self.mmap_config.max_maps,
336 threshold_bytes: self.mmap_config.threshold,
337 }
338 }
339
340 pub fn clear_mmap_cache(&mut self) {
342 self.mmap_cache.clear();
343 self.access_count.clear();
344 }
345
346 pub fn get_file_size(
348 &mut self,
349 name: &str,
350 ) -> io::Result<u64> {
351 let f = self.zip.by_name(name)?;
352 Ok(f.size())
353 }
354
355 pub fn get_compressed_size(
357 &mut self,
358 name: &str,
359 ) -> io::Result<u64> {
360 let f = self.zip.by_name(name)?;
361 Ok(f.compressed_size())
362 }
363
364 pub fn classify_file_size(
366 &mut self,
367 name: &str,
368 ) -> io::Result<FileSizeCategory> {
369 let file_size = self.get_file_size(name)?;
370
371 if file_size >= self.mmap_config.huge_file_threshold {
372 Ok(FileSizeCategory::Huge)
373 } else if file_size >= self.mmap_config.threshold {
374 Ok(FileSizeCategory::Large)
375 } else {
376 Ok(FileSizeCategory::Small)
377 }
378 }
379
380 pub fn get_file_info(
382 &mut self,
383 name: &str,
384 ) -> io::Result<FileInfo> {
385 let (size, compressed_size) = {
386 let f = self.zip.by_name(name)?;
387 (f.size(), f.compressed_size())
388 }; let category = if size >= self.mmap_config.huge_file_threshold {
391 FileSizeCategory::Huge
392 } else if size >= self.mmap_config.threshold {
393 FileSizeCategory::Large
394 } else {
395 FileSizeCategory::Small
396 };
397
398 let recommended_strategy = self.recommend_processing_strategy(size);
399
400 Ok(FileInfo {
401 name: name.to_string(),
402 size,
403 compressed_size,
404 compression_ratio: if size > 0 {
405 compressed_size as f64 / size as f64
406 } else {
407 1.0
408 },
409 category,
410 recommended_strategy,
411 })
412 }
413
414 pub fn recommend_processing_strategy(
416 &self,
417 file_size: u64,
418 ) -> ProcessingStrategy {
419 if file_size >= self.mmap_config.huge_file_threshold
420 && self.mmap_config.enable_streaming
421 {
422 ProcessingStrategy::Streaming
423 } else if file_size >= self.mmap_config.threshold {
424 ProcessingStrategy::MemoryMap
425 } else {
426 ProcessingStrategy::Standard
427 }
428 }
429
430 pub fn preheat_mmap(
432 &mut self,
433 names: &[&str],
434 ) -> io::Result<()> {
435 for &name in names {
436 if !self.mmap_cache.contains_key(name) {
437 let file_size = self.get_file_size(name)?;
438
439 if file_size >= self.mmap_config.threshold {
440 self.create_mmap_entry(name)?;
441 }
442 }
443 }
444 Ok(())
445 }
446
447 fn read_huge_file_streaming(
449 &mut self,
450 name: &str,
451 ) -> io::Result<Vec<u8>> {
452 let mut file = self.zip.by_name(name)?;
453 let total_size = file.size() as usize;
454 let mut result = Vec::with_capacity(total_size);
455
456 let chunk_size = self.mmap_config.stream_chunk_size;
457 let mut buffer = vec![0u8; chunk_size];
458
459 loop {
460 let bytes_read = file.read(&mut buffer)?;
461 if bytes_read == 0 {
462 break;
463 }
464 result.extend_from_slice(&buffer[..bytes_read]);
465 }
466
467 Ok(result)
468 }
469
470 pub fn create_stream_reader(
472 &mut self,
473 name: &str,
474 ) -> io::Result<ZipStreamReader> {
475 let mut file = self.zip.by_name(name)?;
477 let total_size = file.size();
478 let chunk_size = self.mmap_config.stream_chunk_size;
479
480 let mut chunks = Vec::new();
481 let mut buffer = vec![0u8; chunk_size];
482
483 loop {
484 let bytes_read = file.read(&mut buffer)?;
485 if bytes_read == 0 {
486 break;
487 }
488
489 chunks.push(buffer[..bytes_read].to_vec());
490 }
491
492 Ok(ZipStreamReader {
493 chunks: chunks.into_iter(),
494 total_size,
495 current_pos: 0,
496 })
497 }
498
499 pub fn process_smart<F>(
501 &mut self,
502 name: &str,
503 mut processor: F,
504 ) -> io::Result<()>
505 where
506 F: FnMut(&[u8]) -> io::Result<()>,
507 {
508 let file_info = self.get_file_info(name)?;
509
510 match file_info.recommended_strategy {
511 ProcessingStrategy::Standard => {
512 *self.access_count.entry(name.to_string()).or_insert(0) += 1;
514 let data = self.read_standard(name)?;
515 processor(&data)
516 },
517 ProcessingStrategy::MemoryMap => {
518 match self.read_mmap(name) {
520 Ok(data) => {
521 processor(data)
523 },
524 Err(_) => {
525 *self
527 .access_count
528 .entry(name.to_string())
529 .or_insert(0) += 1;
530 let data = self.read_standard(name)?;
531 processor(&data)
532 },
533 }
534 },
535 ProcessingStrategy::Streaming => {
536 if self.mmap_config.enable_streaming {
538 *self.access_count.entry(name.to_string()).or_insert(0) +=
539 1;
540 self.process_huge_file(name, processor)
541 } else {
542 match self.read_mmap(name) {
544 Ok(data) => processor(data),
545 Err(_) => {
546 *self
547 .access_count
548 .entry(name.to_string())
549 .or_insert(0) += 1;
550 let data = self.read_standard(name)?;
551 processor(&data)
552 },
553 }
554 }
555 },
556 }
557 }
558
559 pub fn process_files_smart<F>(
561 &mut self,
562 file_names: &[&str],
563 mut processor: F,
564 ) -> io::Result<()>
565 where
566 F: FnMut(&str, &[u8]) -> io::Result<()>,
567 {
568 for &name in file_names {
569 let file_info = self.get_file_info(name)?;
570
571 match file_info.recommended_strategy {
573 ProcessingStrategy::Standard
574 | ProcessingStrategy::MemoryMap => {
575 let data = self.read_smart(name)?;
577 processor(name, &data)?;
578 },
579 ProcessingStrategy::Streaming => {
580 let mut accumulated_data = Vec::new();
582 self.process_smart(name, |chunk| {
583 accumulated_data.extend_from_slice(chunk);
584 Ok(())
585 })?;
586 processor(name, &accumulated_data)?;
587 },
588 }
589 }
590
591 Ok(())
592 }
593
594 pub fn process_huge_file<F>(
596 &mut self,
597 name: &str,
598 mut processor: F,
599 ) -> io::Result<()>
600 where
601 F: FnMut(&[u8]) -> io::Result<()>,
602 {
603 let mut file = self.zip.by_name(name)?;
604 let chunk_size = self.mmap_config.stream_chunk_size;
605 let mut buffer = vec![0u8; chunk_size];
606
607 loop {
608 let bytes_read = file.read(&mut buffer)?;
609 if bytes_read == 0 {
610 break;
611 }
612 processor(&buffer[..bytes_read])?;
613 }
614
615 Ok(())
616 }
617}
618
619#[derive(Debug, Clone)]
621pub struct MmapStats {
622 pub cached_entries: usize,
624 pub total_cached_size: u64,
626 pub max_entries: usize,
628 pub threshold_bytes: u64,
630}
631
632impl std::fmt::Display for MmapStats {
633 fn fmt(
634 &self,
635 f: &mut std::fmt::Formatter<'_>,
636 ) -> std::fmt::Result {
637 write!(
638 f,
639 "mmap 缓存: {}/{} 条目, {:.2} MB 总大小, 阈值 {:.2} MB",
640 self.cached_entries,
641 self.max_entries,
642 self.total_cached_size as f64 / (1024.0 * 1024.0),
643 self.threshold_bytes as f64 / (1024.0 * 1024.0)
644 )
645 }
646}
647
648pub struct ZipStreamReader {
651 chunks: std::vec::IntoIter<Vec<u8>>,
652 total_size: u64,
653 current_pos: u64,
654}
655
656impl ZipStreamReader {
657 pub fn read_chunk(&mut self) -> io::Result<Option<Vec<u8>>> {
659 if let Some(chunk) = self.chunks.next() {
660 self.current_pos += chunk.len() as u64;
661 Ok(Some(chunk))
662 } else {
663 Ok(None)
664 }
665 }
666
667 pub fn total_size(&self) -> u64 {
669 self.total_size
670 }
671
672 pub fn position(&self) -> u64 {
674 self.current_pos
675 }
676
677 pub fn is_finished(&self) -> bool {
679 self.current_pos >= self.total_size
680 }
681
682 pub fn reset(&mut self) {
684 self.current_pos = 0;
685 }
686
687 pub fn process_chunks<F>(
689 &mut self,
690 mut processor: F,
691 ) -> io::Result<()>
692 where
693 F: FnMut(&[u8]) -> io::Result<()>,
694 {
695 while let Some(chunk) = self.read_chunk()? {
696 processor(&chunk)?;
697 }
698 Ok(())
699 }
700
701 pub fn read_all_streaming(&mut self) -> io::Result<Vec<u8>> {
703 let mut result = Vec::with_capacity(self.total_size as usize);
704
705 while let Some(chunk) = self.read_chunk()? {
706 result.extend_from_slice(&chunk);
707 }
708
709 Ok(result)
710 }
711
712 pub fn compute_hash<H>(
714 &mut self,
715 mut hasher: H,
716 ) -> io::Result<()>
717 where
718 H: FnMut(&[u8]),
719 {
720 self.reset();
721 while let Some(chunk) = self.read_chunk()? {
722 hasher(&chunk);
723 }
724 Ok(())
725 }
726}
727
728#[cfg(test)]
729mod tests {
730 use super::*;
731 use std::io::Cursor;
732 use crate::zipdoc::ZipDocumentWriter;
733
734 #[test]
735 fn test_mmap_integration_basic() -> io::Result<()> {
736 let mut zip_data = Vec::new();
738 {
739 let cursor = Cursor::new(&mut zip_data);
740 let mut writer = ZipDocumentWriter::new(cursor)?;
741
742 writer.add_stored("small.txt", b"small content")?;
744
745 let large_content = vec![42u8; 2 * 1024 * 1024]; writer.add_stored("large.bin", &large_content)?;
748
749 writer.finalize()?;
750 }
751
752 let cursor = Cursor::new(zip_data);
754 let mut reader = ZipDocumentReader::new(cursor)?;
755
756 let small_data = reader.read_all("small.txt")?;
758 assert_eq!(small_data, b"small content");
759
760 let stats = reader.mmap_stats();
762 assert_eq!(stats.cached_entries, 0);
763
764 let large_data = reader.read_all("large.bin")?;
766 assert_eq!(large_data.len(), 2 * 1024 * 1024);
767 assert!(large_data.iter().all(|&b| b == 42));
768
769 let stats = reader.mmap_stats();
771 assert_eq!(stats.cached_entries, 1);
772 assert_eq!(stats.total_cached_size, 2 * 1024 * 1024);
773
774 let large_data2 = reader.read_all("large.bin")?;
776 assert_eq!(large_data2, large_data);
777
778 let stats = reader.mmap_stats();
780 assert_eq!(stats.cached_entries, 1);
781
782 Ok(())
783 }
784
785 #[test]
786 fn test_mmap_zero_copy_read() -> io::Result<()> {
787 let mut zip_data = Vec::new();
788 {
789 let cursor = Cursor::new(&mut zip_data);
790 let mut writer = ZipDocumentWriter::new(cursor)?;
791
792 let test_data = vec![123u8; 3 * 1024 * 1024]; writer.add_stored("test.bin", &test_data)?;
794 writer.finalize()?;
795 }
796
797 let cursor = Cursor::new(zip_data);
798 let mut reader = ZipDocumentReader::new(cursor)?;
799
800 let mmap_data = reader.read_mmap("test.bin")?;
802 assert_eq!(mmap_data.len(), 3 * 1024 * 1024);
803 assert!(mmap_data.iter().all(|&b| b == 123));
804
805 let stats = reader.mmap_stats();
807 assert_eq!(stats.cached_entries, 1);
808 assert_eq!(stats.total_cached_size, 3 * 1024 * 1024);
809
810 Ok(())
811 }
812
813 #[test]
814 fn test_mmap_cache_eviction() -> io::Result<()> {
815 let config = MmapConfig {
816 threshold: 1024, max_maps: 2, temp_dir: None,
819 huge_file_threshold: 100 * 1024 * 1024,
820 stream_chunk_size: 8 * 1024 * 1024,
821 enable_streaming: true,
822 };
823
824 let mut zip_data = Vec::new();
825 {
826 let cursor = Cursor::new(&mut zip_data);
827 let mut writer = ZipDocumentWriter::new(cursor)?;
828
829 for i in 1..=3 {
831 let content = vec![i as u8; 2048]; writer.add_stored(&format!("file{i}.bin"), &content)?;
833 }
834
835 writer.finalize()?;
836 }
837
838 let cursor = Cursor::new(zip_data);
839 let mut reader = ZipDocumentReader::with_mmap_config(cursor, config)?;
840
841 let _data1 = reader.read_all("file1.bin")?;
843 let _data2 = reader.read_all("file2.bin")?;
844
845 assert_eq!(reader.mmap_stats().cached_entries, 2);
846
847 let _data3 = reader.read_all("file3.bin")?;
849
850 assert_eq!(reader.mmap_stats().cached_entries, 2);
852
853 Ok(())
854 }
855
856 #[test]
857 fn test_mmap_config_threshold() -> io::Result<()> {
858 let config = MmapConfig {
859 threshold: 5 * 1024 * 1024, max_maps: 8,
861 temp_dir: None,
862 huge_file_threshold: 100 * 1024 * 1024,
863 stream_chunk_size: 8 * 1024 * 1024,
864 enable_streaming: true,
865 };
866
867 let mut zip_data = Vec::new();
868 {
869 let cursor = Cursor::new(&mut zip_data);
870 let mut writer = ZipDocumentWriter::new(cursor)?;
871
872 let small_content = vec![1u8; 1024 * 1024]; writer.add_stored("small.bin", &small_content)?;
875
876 let large_content = vec![2u8; 6 * 1024 * 1024]; writer.add_stored("large.bin", &large_content)?;
879
880 writer.finalize()?;
881 }
882
883 let cursor = Cursor::new(zip_data);
884 let mut reader = ZipDocumentReader::with_mmap_config(cursor, config)?;
885
886 let _small_data = reader.read_all("small.bin")?;
888 assert_eq!(reader.mmap_stats().cached_entries, 0);
889
890 let _large_data = reader.read_all("large.bin")?;
892 assert_eq!(reader.mmap_stats().cached_entries, 1);
893
894 Ok(())
895 }
896
897 #[test]
898 fn test_mmap_preheat() -> io::Result<()> {
899 let mut zip_data = Vec::new();
900 {
901 let cursor = Cursor::new(&mut zip_data);
902 let mut writer = ZipDocumentWriter::new(cursor)?;
903
904 for i in 1..=3 {
905 let content = vec![i as u8; 2 * 1024 * 1024]; writer.add_stored(&format!("data{i}.bin"), &content)?;
907 }
908
909 writer.finalize()?;
910 }
911
912 let cursor = Cursor::new(zip_data);
913 let mut reader = ZipDocumentReader::new(cursor)?;
914
915 reader.preheat_mmap(&["data1.bin", "data2.bin"])?;
917
918 let stats = reader.mmap_stats();
920 assert_eq!(stats.cached_entries, 2);
921
922 let _data1 = reader.read_mmap("data1.bin")?;
924 let _data2 = reader.read_mmap("data2.bin")?;
925
926 let stats = reader.mmap_stats();
928 assert_eq!(stats.cached_entries, 2);
929
930 Ok(())
931 }
932
933 #[test]
934 fn test_mmap_stats_display() {
935 let stats = MmapStats {
936 cached_entries: 3,
937 total_cached_size: 5 * 1024 * 1024, max_entries: 8,
939 threshold_bytes: 1024 * 1024, };
941
942 let display = format!("{stats}");
943 assert!(display.contains("3/8 条目"));
944 assert!(display.contains("5.00 MB"));
945 assert!(display.contains("1.00 MB"));
946 }
947}