1use std::{collections::HashMap, io::{self, Read, Seek, Write}, time::{SystemTime, UNIX_EPOCH, Duration}, sync::{Mutex, RwLock}, path::Path, fs::{File, OpenOptions}};
2use byteorder::{BigEndian, ByteOrder};
3
4use crate::hashes;
5
6const VERSION_MARKER: u8 = 255;
7const CHUNK_VERSION: u8 = 1;
8
9const HEADER_SIZE: usize = 16;
10const STATUS_DELETED: u8 = 42;
11
12const MAX_PACKET_SIZEB: u8 = 22; struct Header {
15 sizeb: u8,
16 status: u8,
17 key_len: u16,
18 val_len: u32,
19 expire: u64
20}
21
22impl Header {
23 fn make(key: &[u8], value: &[u8], expire: u64) -> Self {
24 let key_len = key.len() as u16;
25 let val_len = value.len() as u32;
26
27 let sizeb = next_power_of_2((HEADER_SIZE as u64) + (key_len as u64) + (val_len as u64));
28
29 Header {
30 status: 0,
31 key_len,
32 val_len,
33 expire,
34 sizeb,
35 }
36 }
37
38 fn parse(header_buffer: [u8; HEADER_SIZE]) -> Self {
39 Header {
40 sizeb: header_buffer[0],
41 status: header_buffer[1],
42 key_len: BigEndian::read_u16(&header_buffer[2..4]),
43 val_len: BigEndian::read_u32(&header_buffer[4..8]),
44 expire: BigEndian::read_u64(&header_buffer[8..16])
45 }
46 }
47
48 fn write_to(&self, buffer: &mut [u8]) -> () {
49 buffer[0] = self.sizeb;
50 buffer[1] = self.status;
51 BigEndian::write_u16(&mut buffer[2..4], self.key_len);
52 BigEndian::write_u32(&mut buffer[4..8], self.val_len);
53 BigEndian::write_u64(&mut buffer[8..16], self.expire);
54 }
55
56 fn is_expired(&self) -> bool {
57 if self.expire == 0 {
58 false
59 } else {
60 let since_the_epoch = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
61 self.expire < since_the_epoch.as_secs()
62 }
63 }
64
65 fn is_deleted(&self) -> bool {
66 self.status == STATUS_DELETED
67 }
68
69 fn size(&self) -> usize {
70 1 << self.sizeb
71 }
72}
73
74fn read_header(file: &mut File) -> io::Result<Option<Header>> {
75 let mut header_buffer = [0u8; HEADER_SIZE];
76 let read_result = file.read_exact(&mut header_buffer);
77
78 match read_result {
79 Ok(_) => {
80 let header = Header::parse(header_buffer);
81 Ok(Some(header))
82 },
83 Err(e) => match e.kind() {
84 io::ErrorKind::UnexpectedEof => Ok(None),
85 _ => Err(e),
86 },
87 }
88}
89
90fn next_power_of_2(value: u64) -> u8 {
91 if value == 0 {
92 return 0;
93 }
94
95 for power in 0..32 {
96 let power_val = 1 << power;
97
98 if power_val >= value {
99 return power;
100 }
101 }
102
103 32
104}
105
106struct ChunkFile {
107 file: Mutex<File>
108}
109
110impl ChunkFile {
111 fn sync_all(&self) -> io::Result<()> {
112 let file = self.file.lock().unwrap();
113 file.sync_all()?;
114 Ok(())
115 }
116
117 fn read_exact_at(&self, buf: &mut [u8], pos: io::SeekFrom) -> io::Result<()> {
118 let mut file = self.file.lock().unwrap();
119 file.seek(pos)?;
120 file.read_exact(buf)?;
121 Ok(())
122 }
123
124 fn write_at(&self, buf: &[u8], pos: io::SeekFrom) -> io::Result<u64> {
125 let mut file = self.file.lock().unwrap();
126 let new_pos = file.seek(pos)?;
127 file.write_all(buf)?;
128 Ok(new_pos)
129 }
130}
131
132struct OffsetAndSize {
133 offset: u64,
134 sizeb: u8,
135}
136
137impl OffsetAndSize {
138 fn size(&self) -> usize {
139 1 << self.sizeb
140 }
141}
142
143struct ChunkUnsafe {
144 file: ChunkFile,
145 map: HashMap<u32, Vec<OffsetAndSize>>, holes: HashMap<u64, u8>, need_sync: bool
148}
149
150enum GetResult<T> {
151 Ok(T),
152 NotFound,
153 Expired,
154}
155
156impl ChunkUnsafe {
157
158 fn sync(&mut self) -> io::Result<()> {
159 if self.need_sync {
160 self.need_sync = false;
161 self.file.sync_all()?;
162 }
163
164 Ok(())
165 }
166
167 fn get(&self, key: &[u8], hash: u32) -> io::Result<GetResult<Vec<u8>>> {
168 if let Some(offset_and_size_vec) = self.map.get(&hash) {
169 for offset_and_size in offset_and_size_vec {
170 let offset = offset_and_size.offset;
171 let size = offset_and_size.size();
172 let mut packet = vec![0; size];
173 self.file.read_exact_at(&mut packet, io::SeekFrom::Start(offset))?;
174 let (packet_header, packet_key, packet_val) = packet_unmarshal(&packet);
175
176 if key.len() != packet_key.len() || key != packet_key {
177 continue;
178 }
179
180 if packet_header.is_expired() {
181 return Ok(GetResult::Expired);
182 }
183
184 return Ok(GetResult::Ok(packet_val.to_vec()));
185 }
186 Ok(GetResult::NotFound)
187 } else {
188 Ok(GetResult::NotFound)
189 }
190 }
191
192 fn set(&mut self, key: &[u8], value: &[u8], hash: u32, ttl_sec: u32) -> io::Result<()> {
193 self.need_sync = true;
194 let expire = match ttl_sec {
195 0 => 0,
196 _ => {
197 let ttl_duration = Duration::from_secs(ttl_sec as u64);
198 let ttl_since_the_epoch = SystemTime::now()
199 .checked_add(ttl_duration).unwrap()
200 .duration_since(UNIX_EPOCH).unwrap();
201 ttl_since_the_epoch.as_secs()
202 }
203 };
204 let (new_header, new_packet) = packet_marshal(key, value, expire)?;
205 let mut new_offset = 0;
206 let mut key_old_offset = 0;
207
208 if let Some(offset_and_size_vec) = self.map.get_mut(&hash) {
209 for offset_and_size in offset_and_size_vec {
210 let old_offset = offset_and_size.offset;
211 let old_size = offset_and_size.size();
212 let mut old_packet = vec![0; old_size];
213 self.file.read_exact_at(&mut old_packet, io::SeekFrom::Start(old_offset))?;
214 let (old_header, old_key, _) = packet_unmarshal(&old_packet);
215
216 if key.len() != old_key.len() || key != old_key {
217 continue;
218 }
219
220 key_old_offset = old_offset;
221
222 if old_header.sizeb == new_header.sizeb {
223 new_offset = old_offset;
225 } else {
226 let status_deleted = [STATUS_DELETED];
228 self.file.write_at(&status_deleted, io::SeekFrom::Start(old_offset + 1))?;
229 self.holes.insert(old_offset, old_header.sizeb);
230
231 if let Some(hole_offset) = self.find_hole(new_header.sizeb) {
233 new_offset = hole_offset;
234 self.holes.remove(&hole_offset);
235 }
236 }
237
238 break;
239 }
240 } else {
241 self.map.insert(hash, Vec::with_capacity(1));
242 }
243
244 let new_offset_seek = if new_offset == 0 {
245 io::SeekFrom::End(0)
246 } else {
247 io::SeekFrom::Start(new_offset)
248 };
249
250 new_offset = self.file.write_at(&new_packet, new_offset_seek)?;
251 let offset_and_size_vec = self.map.get_mut(&hash).unwrap();
252
253 if new_offset != key_old_offset {
254 if key_old_offset != 0 {
255 offset_and_size_vec.retain(|x| x.offset != key_old_offset);
256 }
257
258 offset_and_size_vec.push(OffsetAndSize { offset: new_offset, sizeb: new_header.sizeb });
259 }
260
261 Ok(())
262 }
263
264 fn delete (&mut self, key: &[u8], hash: u32) -> io::Result<bool> {
265 let mut key_packet_offset = 0;
266 let mut key_packet_sizeb = 0;
267
268 let mut checked_packet_keys = Vec::new();
269
270 if let Some(offset_and_size_vec) = self.map.get_mut(&hash) {
271 for offset_and_size in offset_and_size_vec {
272 let offset = offset_and_size.offset;
273 let size = offset_and_size.size();
274 let mut packet = vec![0; size];
275 self.file.read_exact_at(&mut packet, io::SeekFrom::Start(offset))?;
276 let (packet_header, packet_key, _) = packet_unmarshal(&packet);
277
278 if key.len() != packet_key.len() || key != packet_key {
279 checked_packet_keys.push(packet_key.to_vec());
280 continue;
281 }
282
283 key_packet_offset = offset;
284 key_packet_sizeb = packet_header.sizeb;
285
286 break;
287 }
288 }
289
290 if key_packet_offset != 0 && key_packet_sizeb != 0 {
291 let offset_and_size_vec = self.map.get_mut(&hash).unwrap();
292 offset_and_size_vec.retain(|x| x.offset != key_packet_offset);
293 let status_deleted = [STATUS_DELETED];
294 self.file.write_at(&status_deleted, io::SeekFrom::Start(key_packet_offset + 1))?;
295 self.holes.insert(key_packet_offset, key_packet_sizeb);
296
297 Ok(true)
298 }
299 else {
300 Ok(false)
301 }
302 }
303
304 fn all_hash(&self) -> Vec<u32> {
305 let mut all_hash = Vec::with_capacity(self.map.len());
306
307 for (hash, _) in &self.map {
308 all_hash.push(*hash)
309 }
310
311 all_hash
312 }
313
314 fn delete_expired(&mut self, hash: u32) -> io::Result<()> {
315 let expired: io::Result<Vec<(u32, u64, u8)>> = {
316 let mut expired = Vec::new();
317
318 if let Some(offset_and_size_vec) = self.map.get_mut(&hash) {
319 for offset_and_size in offset_and_size_vec {
320 let offset = offset_and_size.offset;
321 let sizeb = offset_and_size.sizeb;
322 let mut header_buffer = [0u8; HEADER_SIZE];
323 self.file.read_exact_at(&mut header_buffer, io::SeekFrom::Start(offset))?;
324 let header = Header::parse(header_buffer);
325
326 if header.is_deleted() {
327 continue;
328 }
329
330 if header.is_expired() {
331 expired.push((hash, offset, sizeb));
332 }
333 }
334 }
335
336 Ok(expired)
337 };
338
339 for (hash, offset, sizeb) in expired? {
340 let status_deleted = [STATUS_DELETED];
341 self.file.write_at(&status_deleted, io::SeekFrom::Start(offset + 1))?;
342 let offset_and_size_vec = self.map.get_mut(&hash).unwrap();
343 offset_and_size_vec.retain(|x| x.offset != offset);
344 self.holes.insert(offset, sizeb);
345 }
346
347 Ok(())
348 }
349
350 fn find_hole(&self, sizeb: u8) -> Option<u64> {
351 let holes = &self.holes;
352
353 for (hole_offset, hole_sizeb) in holes {
354 if *hole_sizeb == sizeb {
355 return Some(*hole_offset);
356 }
357 }
358
359 None
360 }
361}
362
363fn packet_marshal(key: &[u8], value: &[u8], expire: u64) -> io::Result<(Header, Vec<u8>)> {
364 let header = Header::make(key, value, expire);
365
366 if header.sizeb > MAX_PACKET_SIZEB {
367 return Err(io::Error::new(io::ErrorKind::InvalidInput, "Exceeded the maximum package size".to_string()))
368 }
369
370 let size = header.size();
371 let mut buffer = vec![0; size];
372 header.write_to(&mut buffer[0..size]);
373 buffer[HEADER_SIZE..HEADER_SIZE + (header.key_len as usize)].copy_from_slice(key);
374 buffer[HEADER_SIZE + (header.key_len as usize)..HEADER_SIZE + (header.key_len as usize) + (header.val_len as usize)].copy_from_slice(value);
375
376 Ok((header, buffer))
377}
378
379fn packet_unmarshal(packet: &[u8]) -> (Header, &[u8], &[u8]) {
380 let mut header_buffer = [0u8; HEADER_SIZE];
381 header_buffer.copy_from_slice(&packet[0..HEADER_SIZE]);
382 let header = Header::parse(header_buffer);
383 let key = &packet[HEADER_SIZE..HEADER_SIZE + (header.key_len as usize)];
384 let value = &packet[HEADER_SIZE + (header.key_len as usize)..HEADER_SIZE + (header.key_len as usize) + (header.val_len as usize)];
385
386 (header, key, value)
387}
388
389pub struct Chunk {
390 chunk_unsafe: RwLock<ChunkUnsafe>
391}
392
393impl Chunk {
394 pub fn init(file_path: &Path) -> io::Result<Chunk> {
395 let mut file = OpenOptions::new()
396 .read(true)
397 .write(true)
398 .create(true)
399 .open(file_path)?;
400 file.sync_all()?;
401 let mut map = HashMap::new();
402 let mut holes = HashMap::new();
403 let file_metadata = file.metadata()?;
404
405 if file_metadata.len() == 0 {
406 file.write(&[VERSION_MARKER, CHUNK_VERSION])?;
407 }
408 else {
409 let chunk_version = read_chunk_version(&mut file)?;
410
411 if chunk_version != 1 {
412 return Err(io::Error::new(io::ErrorKind::InvalidData, format!("Unknown chunk version = {} in file", chunk_version)))
413 }
414
415 let mut current_offset = 2;
416
417 loop {
418 let option_header = read_header(&mut file)?;
419
420 match option_header {
421 None => break,
422 Some(header) => {
423 let key_len = header.key_len as usize;
425 let mut key = vec![0; key_len];
426 file.read_exact(&mut key)?;
427
428 let size = header.size();
430 let new_offset = file.seek(io::SeekFrom::Current(((size as u32) - (header.key_len as u32) - (HEADER_SIZE as u32)) as i64))?;
431
432 if header.is_deleted() || header.is_expired() {
434 holes.insert(current_offset, header.sizeb);
435 } else {
436 let hash = hashes::hash(&key);
437
438 if map.contains_key(&hash) == false {
439 map.insert(hash, Vec::new());
440 }
441
442 let offset_and_size_vec = map.get_mut(&hash).unwrap();
443 offset_and_size_vec.push(OffsetAndSize { offset: current_offset, sizeb: header.sizeb });
444 }
445
446 current_offset = new_offset;
447 },
448 }
449 }
450 }
451
452 let chunk_file = ChunkFile {
453 file: Mutex::new(file)
454 };
455 let chunk_unsafe = ChunkUnsafe {
456 file: chunk_file,
457 map,
458 holes,
459 need_sync: false
460 };
461 let chunk = Chunk {
462 chunk_unsafe: RwLock::new(chunk_unsafe)
463 };
464
465 Ok(chunk)
466 }
467
468 pub fn sync(&self) -> io::Result<()> {
469 let mut chunk_unsafe = self.chunk_unsafe.write().unwrap();
470 chunk_unsafe.sync()?;
471 Ok(())
472 }
473
474 pub fn get(&self, key: &[u8], hash: u32) -> io::Result<Option<Vec<u8>>> {
475 let result = {
476 let chunk_unsafe_read = self.chunk_unsafe.read().unwrap();
477 chunk_unsafe_read.get(key, hash)?
478 };
479
480 match result {
481 GetResult::Ok(value) => Ok(Some(value)),
482 GetResult::NotFound => Ok(None),
483 GetResult::Expired => {
484 let mut chunk_unsafe_write = self.chunk_unsafe.write().unwrap();
485 chunk_unsafe_write.delete(key, hash)?;
486 Ok(None)
487 },
488 }
489 }
490
491 pub fn set(&self, key: &[u8], value: &[u8], hash: u32, ttl_sec: u32) -> io::Result<()> {
492 let mut chunk_unsafe = self.chunk_unsafe.write().unwrap();
493 chunk_unsafe.set(key, value, hash, ttl_sec)
494 }
495
496 pub fn delete (&self, key: &[u8], hash: u32) -> io::Result<bool> {
497 let mut chunk_unsafe = self.chunk_unsafe.write().unwrap();
498 chunk_unsafe.delete(key, hash)
499 }
500
501 pub fn delete_expired(&self) -> io::Result<()> {
502 let all_hash = {
503 let chunk_unsafe_read = self.chunk_unsafe.read().unwrap();
504 chunk_unsafe_read.all_hash()
505 };
506
507 for hash in all_hash {
508 let mut chunk_unsafe = self.chunk_unsafe.write().unwrap();
509 chunk_unsafe.delete_expired(hash)?;
510 }
511
512 Ok(())
513 }
514}
515
516fn read_chunk_version(file: &mut File) -> io::Result<u8> {
517 let mut chunk_version_buffer = [0, 0];
518 file.read_exact(&mut chunk_version_buffer)?;
519
520 if chunk_version_buffer[0] == VERSION_MARKER {
521 Ok(chunk_version_buffer[1])
522 }
523 else {
524 Ok(0)
525 }
526}