1use std::io::{self, Read, Seek, Write, BufWriter};
2use std::collections::HashMap;
3use std::path::PathBuf;
4use zip::ZipArchive;
5use memmap2::{Mmap, MmapOptions};
6use tempfile::NamedTempFile;
7
8#[derive(Debug, Clone)]
10pub struct MmapConfig {
11 pub threshold: u64,
13 pub max_maps: usize,
15 pub temp_dir: Option<PathBuf>,
17 pub huge_file_threshold: u64,
19 pub stream_chunk_size: usize,
21 pub enable_streaming: bool,
23}
24
25impl Default for MmapConfig {
26 fn default() -> Self {
27 Self {
28 threshold: 1024 * 1024, max_maps: 8,
30 temp_dir: None,
31 huge_file_threshold: 100 * 1024 * 1024, stream_chunk_size: 8 * 1024 * 1024, enable_streaming: true,
34 }
35 }
36}
37
38struct MmapEntry {
40 _temp_file: NamedTempFile,
41 mmap: Mmap,
42}
43
44#[derive(Debug, Clone, PartialEq)]
46pub enum FileSizeCategory {
47 Small,
49 Large,
51 Huge,
53}
54
55#[derive(Debug, Clone, PartialEq)]
57pub enum ProcessingStrategy {
58 Standard,
60 MemoryMap,
62 Streaming,
64}
65
66#[derive(Debug, Clone)]
68pub struct FileInfo {
69 pub name: String,
71 pub size: u64,
73 pub compressed_size: u64,
75 pub compression_ratio: f64,
77 pub category: FileSizeCategory,
79 pub recommended_strategy: ProcessingStrategy,
81}
82
83pub struct ZipDocumentReader<R: Read + Seek> {
85 pub(crate) zip: ZipArchive<R>,
86 mmap_config: MmapConfig,
87 mmap_cache: HashMap<String, MmapEntry>,
88 access_count: HashMap<String, u64>,
89}
90
91impl<R: Read + Seek> ZipDocumentReader<R> {
92 pub fn new(r: R) -> io::Result<Self> {
94 Self::with_mmap_config(r, MmapConfig::default())
95 }
96
97 pub fn with_mmap_config(
99 r: R,
100 config: MmapConfig,
101 ) -> io::Result<Self> {
102 Ok(Self {
103 zip: ZipArchive::new(r)?,
104 mmap_config: config,
105 mmap_cache: HashMap::new(),
106 access_count: HashMap::new(),
107 })
108 }
109 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self), fields(
111 crate_name = "file",
112 file_name = %name
113 )))]
114 pub fn read_all(
115 &mut self,
116 name: &str,
117 ) -> io::Result<Vec<u8>> {
118 self.read_smart(name)
120 }
121
122 pub fn read_smart(
124 &mut self,
125 name: &str,
126 ) -> io::Result<Vec<u8>> {
127 let file_info = self.get_file_info(name)?;
128
129 match file_info.recommended_strategy {
130 ProcessingStrategy::Standard => {
131 *self.access_count.entry(name.to_string()).or_insert(0) += 1;
133 self.read_standard(name)
134 },
135 ProcessingStrategy::MemoryMap => {
136 match self.read_mmap(name) {
138 Ok(data) => Ok(data.to_vec()),
139 Err(_) => {
140 *self
142 .access_count
143 .entry(name.to_string())
144 .or_insert(0) += 1;
145 self.read_standard(name)
146 },
147 }
148 },
149 ProcessingStrategy::Streaming => {
150 if self.mmap_config.enable_streaming {
152 *self.access_count.entry(name.to_string()).or_insert(0) +=
153 1;
154 self.read_huge_file_streaming(name)
155 } else {
156 match self.read_mmap(name) {
158 Ok(data) => Ok(data.to_vec()),
159 Err(_) => {
160 *self
161 .access_count
162 .entry(name.to_string())
163 .or_insert(0) += 1;
164 self.read_standard(name)
165 },
166 }
167 }
168 },
169 }
170 }
171
172 pub fn read_mmap(
174 &mut self,
175 name: &str,
176 ) -> io::Result<&[u8]> {
177 if self.mmap_cache.contains_key(name) {
179 *self.access_count.entry(name.to_string()).or_insert(0) += 1;
181 return Ok(&self.mmap_cache[name].mmap[..]);
182 }
183
184 if self.mmap_cache.len() >= self.mmap_config.max_maps {
186 self.evict_least_used();
187 }
188
189 self.create_mmap_entry(name)?;
191
192 self.access_count.insert(name.to_string(), 1);
194
195 Ok(&self.mmap_cache[name].mmap[..])
196 }
197
198 pub fn read_standard(
200 &mut self,
201 name: &str,
202 ) -> io::Result<Vec<u8>> {
203 let mut f = self.zip.by_name(name)?;
204 let mut buf = Vec::with_capacity(f.size() as usize);
205 std::io::copy(&mut f, &mut buf)?;
206 Ok(buf)
207 }
208
209 pub fn read_plugin_state(
211 &mut self,
212 plugin_name: &str,
213 ) -> io::Result<Option<Vec<u8>>> {
214 let plugin_file_path = format!("plugins/{plugin_name}");
215 match self.read_all(&plugin_file_path) {
216 Ok(data) => Ok(Some(data)),
217 Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(None),
218 Err(e) => Err(e),
219 }
220 }
221
222 pub fn read_all_plugin_states(
224 &mut self
225 ) -> io::Result<std::collections::HashMap<String, Vec<u8>>> {
226 let mut plugin_states = std::collections::HashMap::new();
227
228 let mut plugin_files = Vec::new();
230 for i in 0..self.zip.len() {
231 let file = self.zip.by_index(i)?;
232 let file_name = file.name().to_string();
233
234 if file_name.starts_with("plugins/") && !file_name.ends_with('/') {
235 let plugin_name =
236 file_name.strip_prefix("plugins/").unwrap().to_string();
237 plugin_files.push((plugin_name, file_name));
238 }
239 }
240
241 for (plugin_name, file_name) in plugin_files {
243 let data = self.read_all(&file_name)?;
244 plugin_states.insert(plugin_name, data);
245 }
246
247 Ok(plugin_states)
248 }
249
250 pub fn list_plugins(&mut self) -> io::Result<Vec<String>> {
252 let mut plugins = Vec::new();
253
254 for i in 0..self.zip.len() {
255 let file = self.zip.by_index(i)?;
256 let file_name = file.name();
257
258 if file_name.starts_with("plugins/") && !file_name.ends_with('/') {
259 let plugin_name =
260 file_name.strip_prefix("plugins/").unwrap().to_string();
261 plugins.push(plugin_name);
262 }
263 }
264
265 Ok(plugins)
266 }
267
268 pub fn has_plugin_state(
270 &mut self,
271 plugin_name: &str,
272 ) -> bool {
273 let plugin_file_path = format!("plugins/{plugin_name}");
274 self.zip.by_name(&plugin_file_path).is_ok()
275 }
276
277 fn create_mmap_entry(
279 &mut self,
280 name: &str,
281 ) -> io::Result<()> {
282 let mut temp_file =
284 if let Some(ref temp_dir) = self.mmap_config.temp_dir {
285 NamedTempFile::new_in(temp_dir)?
286 } else {
287 NamedTempFile::new()?
288 };
289
290 {
292 let mut zip_file = self.zip.by_name(name)?;
293 let mut writer = BufWriter::new(&mut temp_file);
294 std::io::copy(&mut zip_file, &mut writer)?;
295 writer.flush()?;
296 }
297
298 temp_file.as_file().sync_all()?;
300
301 let mmap = unsafe { MmapOptions::new().map(temp_file.as_file())? };
303
304 self.mmap_cache.insert(
306 name.to_string(),
307 MmapEntry { _temp_file: temp_file, mmap },
308 );
309
310 Ok(())
311 }
312
313 fn evict_least_used(&mut self) {
315 if let Some((lru_name, _)) = self
316 .access_count
317 .iter()
318 .min_by_key(|(_, count)| **count)
319 .map(|(name, count)| (name.clone(), *count))
320 {
321 self.mmap_cache.remove(&lru_name);
322 self.access_count.remove(&lru_name);
323 }
324 }
325
326 pub fn mmap_config(&self) -> &MmapConfig {
328 &self.mmap_config
329 }
330
331 pub fn mmap_stats(&self) -> MmapStats {
333 let total_size: u64 =
334 self.mmap_cache.values().map(|entry| entry.mmap.len() as u64).sum();
335
336 MmapStats {
337 cached_entries: self.mmap_cache.len(),
338 total_cached_size: total_size,
339 max_entries: self.mmap_config.max_maps,
340 threshold_bytes: self.mmap_config.threshold,
341 }
342 }
343
344 pub fn clear_mmap_cache(&mut self) {
346 self.mmap_cache.clear();
347 self.access_count.clear();
348 }
349
350 pub fn get_file_size(
352 &mut self,
353 name: &str,
354 ) -> io::Result<u64> {
355 let f = self.zip.by_name(name)?;
356 Ok(f.size())
357 }
358
359 pub fn get_compressed_size(
361 &mut self,
362 name: &str,
363 ) -> io::Result<u64> {
364 let f = self.zip.by_name(name)?;
365 Ok(f.compressed_size())
366 }
367
368 pub fn classify_file_size(
370 &mut self,
371 name: &str,
372 ) -> io::Result<FileSizeCategory> {
373 let file_size = self.get_file_size(name)?;
374
375 if file_size >= self.mmap_config.huge_file_threshold {
376 Ok(FileSizeCategory::Huge)
377 } else if file_size >= self.mmap_config.threshold {
378 Ok(FileSizeCategory::Large)
379 } else {
380 Ok(FileSizeCategory::Small)
381 }
382 }
383
384 pub fn get_file_info(
386 &mut self,
387 name: &str,
388 ) -> io::Result<FileInfo> {
389 let (size, compressed_size) = {
390 let f = self.zip.by_name(name)?;
391 (f.size(), f.compressed_size())
392 }; let category = if size >= self.mmap_config.huge_file_threshold {
395 FileSizeCategory::Huge
396 } else if size >= self.mmap_config.threshold {
397 FileSizeCategory::Large
398 } else {
399 FileSizeCategory::Small
400 };
401
402 let recommended_strategy = self.recommend_processing_strategy(size);
403
404 Ok(FileInfo {
405 name: name.to_string(),
406 size,
407 compressed_size,
408 compression_ratio: if size > 0 {
409 compressed_size as f64 / size as f64
410 } else {
411 1.0
412 },
413 category,
414 recommended_strategy,
415 })
416 }
417
418 pub fn recommend_processing_strategy(
420 &self,
421 file_size: u64,
422 ) -> ProcessingStrategy {
423 if file_size >= self.mmap_config.huge_file_threshold
424 && self.mmap_config.enable_streaming
425 {
426 ProcessingStrategy::Streaming
427 } else if file_size >= self.mmap_config.threshold {
428 ProcessingStrategy::MemoryMap
429 } else {
430 ProcessingStrategy::Standard
431 }
432 }
433
434 pub fn preheat_mmap(
436 &mut self,
437 names: &[&str],
438 ) -> io::Result<()> {
439 for &name in names {
440 if !self.mmap_cache.contains_key(name) {
441 let file_size = self.get_file_size(name)?;
442
443 if file_size >= self.mmap_config.threshold {
444 self.create_mmap_entry(name)?;
445 }
446 }
447 }
448 Ok(())
449 }
450
451 fn read_huge_file_streaming(
453 &mut self,
454 name: &str,
455 ) -> io::Result<Vec<u8>> {
456 let mut file = self.zip.by_name(name)?;
457 let total_size = file.size() as usize;
458 let mut result = Vec::with_capacity(total_size);
459
460 let chunk_size = self.mmap_config.stream_chunk_size;
461 let mut buffer = vec![0u8; chunk_size];
462
463 loop {
464 let bytes_read = file.read(&mut buffer)?;
465 if bytes_read == 0 {
466 break;
467 }
468 result.extend_from_slice(&buffer[..bytes_read]);
469 }
470
471 Ok(result)
472 }
473
474 pub fn create_stream_reader(
476 &mut self,
477 name: &str,
478 ) -> io::Result<ZipStreamReader> {
479 let mut file = self.zip.by_name(name)?;
481 let total_size = file.size();
482 let chunk_size = self.mmap_config.stream_chunk_size;
483
484 let mut chunks = Vec::new();
485 let mut buffer = vec![0u8; chunk_size];
486
487 loop {
488 let bytes_read = file.read(&mut buffer)?;
489 if bytes_read == 0 {
490 break;
491 }
492
493 chunks.push(buffer[..bytes_read].to_vec());
494 }
495
496 Ok(ZipStreamReader {
497 chunks: chunks.into_iter(),
498 total_size,
499 current_pos: 0,
500 })
501 }
502
503 pub fn process_smart<F>(
505 &mut self,
506 name: &str,
507 mut processor: F,
508 ) -> io::Result<()>
509 where
510 F: FnMut(&[u8]) -> io::Result<()>,
511 {
512 let file_info = self.get_file_info(name)?;
513
514 match file_info.recommended_strategy {
515 ProcessingStrategy::Standard => {
516 *self.access_count.entry(name.to_string()).or_insert(0) += 1;
518 let data = self.read_standard(name)?;
519 processor(&data)
520 },
521 ProcessingStrategy::MemoryMap => {
522 match self.read_mmap(name) {
524 Ok(data) => {
525 processor(data)
527 },
528 Err(_) => {
529 *self
531 .access_count
532 .entry(name.to_string())
533 .or_insert(0) += 1;
534 let data = self.read_standard(name)?;
535 processor(&data)
536 },
537 }
538 },
539 ProcessingStrategy::Streaming => {
540 if self.mmap_config.enable_streaming {
542 *self.access_count.entry(name.to_string()).or_insert(0) +=
543 1;
544 self.process_huge_file(name, processor)
545 } else {
546 match self.read_mmap(name) {
548 Ok(data) => processor(data),
549 Err(_) => {
550 *self
551 .access_count
552 .entry(name.to_string())
553 .or_insert(0) += 1;
554 let data = self.read_standard(name)?;
555 processor(&data)
556 },
557 }
558 }
559 },
560 }
561 }
562
563 pub fn process_files_smart<F>(
565 &mut self,
566 file_names: &[&str],
567 mut processor: F,
568 ) -> io::Result<()>
569 where
570 F: FnMut(&str, &[u8]) -> io::Result<()>,
571 {
572 for &name in file_names {
573 let file_info = self.get_file_info(name)?;
574
575 match file_info.recommended_strategy {
577 ProcessingStrategy::Standard
578 | ProcessingStrategy::MemoryMap => {
579 let data = self.read_smart(name)?;
581 processor(name, &data)?;
582 },
583 ProcessingStrategy::Streaming => {
584 let mut accumulated_data = Vec::new();
586 self.process_smart(name, |chunk| {
587 accumulated_data.extend_from_slice(chunk);
588 Ok(())
589 })?;
590 processor(name, &accumulated_data)?;
591 },
592 }
593 }
594
595 Ok(())
596 }
597
598 pub fn process_huge_file<F>(
600 &mut self,
601 name: &str,
602 mut processor: F,
603 ) -> io::Result<()>
604 where
605 F: FnMut(&[u8]) -> io::Result<()>,
606 {
607 let mut file = self.zip.by_name(name)?;
608 let chunk_size = self.mmap_config.stream_chunk_size;
609 let mut buffer = vec![0u8; chunk_size];
610
611 loop {
612 let bytes_read = file.read(&mut buffer)?;
613 if bytes_read == 0 {
614 break;
615 }
616 processor(&buffer[..bytes_read])?;
617 }
618
619 Ok(())
620 }
621}
622
623#[derive(Debug, Clone)]
625pub struct MmapStats {
626 pub cached_entries: usize,
628 pub total_cached_size: u64,
630 pub max_entries: usize,
632 pub threshold_bytes: u64,
634}
635
636impl std::fmt::Display for MmapStats {
637 fn fmt(
638 &self,
639 f: &mut std::fmt::Formatter<'_>,
640 ) -> std::fmt::Result {
641 write!(
642 f,
643 "mmap 缓存: {}/{} 条目, {:.2} MB 总大小, 阈值 {:.2} MB",
644 self.cached_entries,
645 self.max_entries,
646 self.total_cached_size as f64 / (1024.0 * 1024.0),
647 self.threshold_bytes as f64 / (1024.0 * 1024.0)
648 )
649 }
650}
651
652pub struct ZipStreamReader {
655 chunks: std::vec::IntoIter<Vec<u8>>,
656 total_size: u64,
657 current_pos: u64,
658}
659
660impl ZipStreamReader {
661 pub fn read_chunk(&mut self) -> io::Result<Option<Vec<u8>>> {
663 if let Some(chunk) = self.chunks.next() {
664 self.current_pos += chunk.len() as u64;
665 Ok(Some(chunk))
666 } else {
667 Ok(None)
668 }
669 }
670
671 pub fn total_size(&self) -> u64 {
673 self.total_size
674 }
675
676 pub fn position(&self) -> u64 {
678 self.current_pos
679 }
680
681 pub fn is_finished(&self) -> bool {
683 self.current_pos >= self.total_size
684 }
685
686 pub fn reset(&mut self) {
688 self.current_pos = 0;
689 }
690
691 pub fn process_chunks<F>(
693 &mut self,
694 mut processor: F,
695 ) -> io::Result<()>
696 where
697 F: FnMut(&[u8]) -> io::Result<()>,
698 {
699 while let Some(chunk) = self.read_chunk()? {
700 processor(&chunk)?;
701 }
702 Ok(())
703 }
704
705 pub fn read_all_streaming(&mut self) -> io::Result<Vec<u8>> {
707 let mut result = Vec::with_capacity(self.total_size as usize);
708
709 while let Some(chunk) = self.read_chunk()? {
710 result.extend_from_slice(&chunk);
711 }
712
713 Ok(result)
714 }
715
716 pub fn compute_hash<H>(
718 &mut self,
719 mut hasher: H,
720 ) -> io::Result<()>
721 where
722 H: FnMut(&[u8]),
723 {
724 self.reset();
725 while let Some(chunk) = self.read_chunk()? {
726 hasher(&chunk);
727 }
728 Ok(())
729 }
730}
731
732#[cfg(test)]
733mod tests {
734 use super::*;
735 use std::io::Cursor;
736 use crate::zipdoc::ZipDocumentWriter;
737
738 #[test]
739 fn test_mmap_integration_basic() -> io::Result<()> {
740 let mut zip_data = Vec::new();
742 {
743 let cursor = Cursor::new(&mut zip_data);
744 let mut writer = ZipDocumentWriter::new(cursor)?;
745
746 writer.add_stored("small.txt", b"small content")?;
748
749 let large_content = vec![42u8; 2 * 1024 * 1024]; writer.add_stored("large.bin", &large_content)?;
752
753 writer.finalize()?;
754 }
755
756 let cursor = Cursor::new(zip_data);
758 let mut reader = ZipDocumentReader::new(cursor)?;
759
760 let small_data = reader.read_all("small.txt")?;
762 assert_eq!(small_data, b"small content");
763
764 let stats = reader.mmap_stats();
766 assert_eq!(stats.cached_entries, 0);
767
768 let large_data = reader.read_all("large.bin")?;
770 assert_eq!(large_data.len(), 2 * 1024 * 1024);
771 assert!(large_data.iter().all(|&b| b == 42));
772
773 let stats = reader.mmap_stats();
775 assert_eq!(stats.cached_entries, 1);
776 assert_eq!(stats.total_cached_size, 2 * 1024 * 1024);
777
778 let large_data2 = reader.read_all("large.bin")?;
780 assert_eq!(large_data2, large_data);
781
782 let stats = reader.mmap_stats();
784 assert_eq!(stats.cached_entries, 1);
785
786 Ok(())
787 }
788
789 #[test]
790 fn test_mmap_zero_copy_read() -> io::Result<()> {
791 let mut zip_data = Vec::new();
792 {
793 let cursor = Cursor::new(&mut zip_data);
794 let mut writer = ZipDocumentWriter::new(cursor)?;
795
796 let test_data = vec![123u8; 3 * 1024 * 1024]; writer.add_stored("test.bin", &test_data)?;
798 writer.finalize()?;
799 }
800
801 let cursor = Cursor::new(zip_data);
802 let mut reader = ZipDocumentReader::new(cursor)?;
803
804 let mmap_data = reader.read_mmap("test.bin")?;
806 assert_eq!(mmap_data.len(), 3 * 1024 * 1024);
807 assert!(mmap_data.iter().all(|&b| b == 123));
808
809 let stats = reader.mmap_stats();
811 assert_eq!(stats.cached_entries, 1);
812 assert_eq!(stats.total_cached_size, 3 * 1024 * 1024);
813
814 Ok(())
815 }
816
817 #[test]
818 fn test_mmap_cache_eviction() -> io::Result<()> {
819 let config = MmapConfig {
820 threshold: 1024, max_maps: 2, temp_dir: None,
823 huge_file_threshold: 100 * 1024 * 1024,
824 stream_chunk_size: 8 * 1024 * 1024,
825 enable_streaming: true,
826 };
827
828 let mut zip_data = Vec::new();
829 {
830 let cursor = Cursor::new(&mut zip_data);
831 let mut writer = ZipDocumentWriter::new(cursor)?;
832
833 for i in 1..=3 {
835 let content = vec![i as u8; 2048]; writer.add_stored(&format!("file{i}.bin"), &content)?;
837 }
838
839 writer.finalize()?;
840 }
841
842 let cursor = Cursor::new(zip_data);
843 let mut reader = ZipDocumentReader::with_mmap_config(cursor, config)?;
844
845 let _data1 = reader.read_all("file1.bin")?;
847 let _data2 = reader.read_all("file2.bin")?;
848
849 assert_eq!(reader.mmap_stats().cached_entries, 2);
850
851 let _data3 = reader.read_all("file3.bin")?;
853
854 assert_eq!(reader.mmap_stats().cached_entries, 2);
856
857 Ok(())
858 }
859
860 #[test]
861 fn test_mmap_config_threshold() -> io::Result<()> {
862 let config = MmapConfig {
863 threshold: 5 * 1024 * 1024, max_maps: 8,
865 temp_dir: None,
866 huge_file_threshold: 100 * 1024 * 1024,
867 stream_chunk_size: 8 * 1024 * 1024,
868 enable_streaming: true,
869 };
870
871 let mut zip_data = Vec::new();
872 {
873 let cursor = Cursor::new(&mut zip_data);
874 let mut writer = ZipDocumentWriter::new(cursor)?;
875
876 let small_content = vec![1u8; 1024 * 1024]; writer.add_stored("small.bin", &small_content)?;
879
880 let large_content = vec![2u8; 6 * 1024 * 1024]; writer.add_stored("large.bin", &large_content)?;
883
884 writer.finalize()?;
885 }
886
887 let cursor = Cursor::new(zip_data);
888 let mut reader = ZipDocumentReader::with_mmap_config(cursor, config)?;
889
890 let _small_data = reader.read_all("small.bin")?;
892 assert_eq!(reader.mmap_stats().cached_entries, 0);
893
894 let _large_data = reader.read_all("large.bin")?;
896 assert_eq!(reader.mmap_stats().cached_entries, 1);
897
898 Ok(())
899 }
900
901 #[test]
902 fn test_mmap_preheat() -> io::Result<()> {
903 let mut zip_data = Vec::new();
904 {
905 let cursor = Cursor::new(&mut zip_data);
906 let mut writer = ZipDocumentWriter::new(cursor)?;
907
908 for i in 1..=3 {
909 let content = vec![i as u8; 2 * 1024 * 1024]; writer.add_stored(&format!("data{i}.bin"), &content)?;
911 }
912
913 writer.finalize()?;
914 }
915
916 let cursor = Cursor::new(zip_data);
917 let mut reader = ZipDocumentReader::new(cursor)?;
918
919 reader.preheat_mmap(&["data1.bin", "data2.bin"])?;
921
922 let stats = reader.mmap_stats();
924 assert_eq!(stats.cached_entries, 2);
925
926 let _data1 = reader.read_mmap("data1.bin")?;
928 let _data2 = reader.read_mmap("data2.bin")?;
929
930 let stats = reader.mmap_stats();
932 assert_eq!(stats.cached_entries, 2);
933
934 Ok(())
935 }
936
937 #[test]
938 fn test_mmap_stats_display() {
939 let stats = MmapStats {
940 cached_entries: 3,
941 total_cached_size: 5 * 1024 * 1024, max_entries: 8,
943 threshold_bytes: 1024 * 1024, };
945
946 let display = format!("{stats}");
947 assert!(display.contains("3/8 条目"));
948 assert!(display.contains("5.00 MB"));
949 assert!(display.contains("1.00 MB"));
950 }
951}