1use crate::{chunkpool::*, common::*, configure::*};
2use memmap2::Mmap;
3use std::cmp;
4use std::fs::File;
5use std::io::Cursor;
6use std::{fs::OpenOptions, path::PathBuf};
7
8#[derive(Debug)]
9pub struct SearchCp {
10 pool_path: PathBuf,
11 pool_head: Option<PoolHead>,
12 actual_size: Option<ActualSize>,
13 chunk_id: Option<u32>,
14 chunk_map: Option<Mmap>,
15 chunk_head: Option<ChunkHead>,
16 next_pkt_offset: Option<u32>,
17 data_file: Option<File>,
18}
19
20impl SearchCp {
21 pub fn new(configure: &'static Configure, dir_id: u64) -> Self {
22 let mut path = PathBuf::new();
23 path.push(configure.store_path.clone());
24 path.push(format!("{:03}", dir_id));
25 path.push("chunk_pool");
26
27 SearchCp {
28 pool_path: path,
29 pool_head: None,
30 actual_size: None,
31 chunk_id: None,
32 chunk_map: None,
33 chunk_head: None,
34 next_pkt_offset: None,
35 data_file: None,
36 }
37 }
38
39 pub fn load_chunk(&mut self, chunk_id: u32, offset_in_chunk: u32) -> Result<(), StoreError> {
40 let pkt_offset = cmp::max(offset_in_chunk, ChunkHead::serialize_size() as u32);
41
42 if self.chunk_map.is_some() && self.chunk_id.unwrap() == chunk_id {
43 self.next_pkt_offset = Some(pkt_offset);
44 return Ok(());
45 }
46 self.free_chunk()?;
47
48 if self.pool_head.is_none() {
49 let mut head_path = PathBuf::new();
50 head_path.push(&self.pool_path);
51 head_path.push("pool.pl");
52 let pool_head = read_pool_head(&head_path)?;
53 let actual_size = ActualSize::new(
54 pool_head.pool_size,
55 pool_head.file_size,
56 pool_head.chunk_size,
57 );
58 self.pool_head = Some(pool_head);
59 self.actual_size = Some(actual_size);
60 }
61 self.chunk_id = Some(chunk_id);
62 self.next_pkt_offset = Some(pkt_offset);
63
64 let file_chunk_num = self.actual_size.unwrap().file_chunk_num;
65 let data_file_id = chunk_id / file_chunk_num;
66 let data_file_path = self.pool_path.join(format!("{:03}.da", data_file_id));
67 let data_file = match OpenOptions::new()
68 .read(true)
69 .write(false)
70 .create(false)
71 .truncate(false)
72 .open(data_file_path)
73 {
74 Ok(file_fd) => file_fd,
75 Err(e) => {
76 return Err(StoreError::CliError(format!("open data file error: {}", e)));
77 }
78 };
79 self.data_file = Some(data_file);
80
81 let inner_chunk_id = chunk_id - data_file_id * file_chunk_num;
82 let chunk_size = self.pool_head.unwrap().chunk_size;
83 let chunk_offset = inner_chunk_id * chunk_size;
84 match get_rlk_chunk(
85 self.data_file.as_ref().unwrap(),
86 chunk_offset,
87 chunk_size as usize,
88 ) {
89 Ok(mmap) => self.chunk_map = Some(mmap),
90 Err(e) => {
91 return Err(StoreError::CliError(format!("map chunk error: {}", e)));
92 }
93 }
94
95 let mut cursor = Cursor::new(self.chunk_map.as_ref().unwrap());
96 let chunk_head = ChunkHead::deserialize_from(&mut cursor)?;
97 self.chunk_head = Some(chunk_head);
98 Ok(())
99 }
100
101 fn free_chunk(&mut self) -> Result<(), StoreError> {
102 if self.chunk_id.is_none()
103 || self.actual_size.is_none()
104 || self.data_file.is_none()
105 || self.pool_head.is_none()
106 {
107 return Ok(());
108 }
109
110 let chunk_id = self.chunk_id.unwrap();
111 let file_chunk_num = self.actual_size.unwrap().file_chunk_num;
112 let data_file_id = chunk_id / file_chunk_num;
113 let inner_chunk_id = chunk_id - data_file_id * file_chunk_num;
114 let chunk_size = self.pool_head.unwrap().chunk_size;
115 let chunk_offset = inner_chunk_id * chunk_size;
116
117 free_rlk_chunk(
118 self.data_file.as_ref().unwrap(),
119 chunk_offset,
120 self.pool_head.unwrap().chunk_size as usize,
121 )
122 }
123
124 pub fn next_link_pkt(&mut self) -> Option<StorePacket> {
125 self.next_pkt_offset?;
126
127 let offset = self.next_pkt_offset.unwrap() as usize;
128 let chunk = self.chunk_map.as_ref().unwrap();
129 let mut cursor = Cursor::new(&chunk[offset..]);
130
131 if let Ok(pkt) = StorePacket::deserialize_from(&mut cursor) {
132 if pkt.next_offset == 0 {
133 self.next_pkt_offset = None;
134 } else {
135 self.next_pkt_offset = Some(pkt.next_offset);
136 }
137 return Some(pkt);
138 }
139 None
140 }
141
142 pub fn next_pkt(&mut self) -> Option<StorePacket> {
143 self.next_pkt_offset?;
144 self.chunk_head.as_ref()?;
145
146 if self.next_pkt_offset.unwrap() >= self.chunk_head.as_ref().unwrap().filled_size {
147 return None;
148 }
149
150 let offset = self.next_pkt_offset.unwrap() as usize;
151 let chunk = self.chunk_map.as_ref().unwrap();
152 let mut cursor = Cursor::new(&chunk[offset..]);
153 let before_position = cursor.position();
154 if let Ok(pkt) = StorePacket::deserialize_from(&mut cursor) {
155 let after_position = cursor.position();
156 self.next_pkt_offset =
157 Some(offset as u32 + after_position as u32 - before_position as u32);
158 Some(pkt)
159 } else {
160 None
161 }
162 }
163}