cheetah_kv/
chunks.rs

1use std::{collections::HashMap, io::{self, Read, Seek, Write}, time::{SystemTime, UNIX_EPOCH, Duration}, sync::{Mutex, RwLock}, path::Path, fs::{File, OpenOptions}};
2use byteorder::{BigEndian, ByteOrder};
3
4use crate::hashes;
5
6const VERSION_MARKER: u8 = 255;
7const CHUNK_VERSION: u8 = 1;
8
9const HEADER_SIZE: usize = 16;
10const STATUS_DELETED: u8 = 42;
11
12const MAX_PACKET_SIZEB: u8 = 22; //4mb
13
14struct Header {
15    sizeb: u8,
16    status: u8,
17    key_len: u16,
18    val_len: u32,
19    expire: u64
20}
21
22impl Header {
23    fn make(key: &[u8], value: &[u8], expire: u64) -> Self {
24        let key_len = key.len() as u16;
25        let val_len = value.len() as u32;
26
27        let sizeb = next_power_of_2((HEADER_SIZE as u64) + (key_len as u64) + (val_len as u64));
28
29        Header {  
30            status: 0, 
31            key_len, 
32            val_len, 
33            expire,
34            sizeb,
35        }
36    }
37
38    fn parse(header_buffer: [u8; HEADER_SIZE]) -> Self {
39        Header { 
40            sizeb: header_buffer[0], 
41            status: header_buffer[1], 
42            key_len: BigEndian::read_u16(&header_buffer[2..4]), 
43            val_len: BigEndian::read_u32(&header_buffer[4..8]), 
44            expire: BigEndian::read_u64(&header_buffer[8..16]) 
45        }
46    }
47
48    fn write_to(&self, buffer: &mut [u8]) -> () {
49        buffer[0] = self.sizeb;
50        buffer[1] = self.status;
51        BigEndian::write_u16(&mut buffer[2..4], self.key_len);
52        BigEndian::write_u32(&mut buffer[4..8], self.val_len);
53        BigEndian::write_u64(&mut buffer[8..16], self.expire);
54    }
55
56    fn is_expired(&self) -> bool {
57        if self.expire == 0 {
58            false
59        } else {
60            let since_the_epoch = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
61            self.expire < since_the_epoch.as_secs()
62        }
63    }
64
65    fn is_deleted(&self) -> bool {
66        self.status == STATUS_DELETED
67    }
68
69    fn size(&self) -> usize {
70        1 << self.sizeb
71    }
72}
73
74fn read_header(file: &mut File) -> io::Result<Option<Header>> {
75    let mut header_buffer = [0u8; HEADER_SIZE];
76    let read_result = file.read_exact(&mut header_buffer);
77
78    match read_result {
79        Ok(_) => {
80            let header = Header::parse(header_buffer);
81            Ok(Some(header))
82        },
83        Err(e) => match e.kind() {
84            io::ErrorKind::UnexpectedEof => Ok(None),
85            _ => Err(e),
86        },
87    }
88}
89
90fn next_power_of_2(value: u64) -> u8 {
91    if value == 0 {
92        return 0;
93    }
94
95    for power in 0..32 {
96        let power_val = 1 << power;
97
98        if power_val >= value {
99            return power;
100        }
101    }
102
103    32
104}
105
106struct ChunkFile {
107    file: Mutex<File>
108}
109
110impl ChunkFile {
111    fn sync_all(&self) -> io::Result<()> {
112        let file = self.file.lock().unwrap();
113        file.sync_all()?;
114        Ok(())
115    }
116
117    fn read_exact_at(&self, buf: &mut [u8], pos: io::SeekFrom) -> io::Result<()> {
118        let mut file = self.file.lock().unwrap();
119        file.seek(pos)?;
120        file.read_exact(buf)?;
121        Ok(())
122    }
123
124    fn write_at(&self, buf: &[u8], pos: io::SeekFrom) -> io::Result<u64> {
125        let mut file = self.file.lock().unwrap();
126        let new_pos = file.seek(pos)?;
127        file.write_all(buf)?;
128        Ok(new_pos)
129    }
130}
131
132struct OffsetAndSize {
133    offset: u64,
134    sizeb: u8,
135}
136
137impl OffsetAndSize {
138    fn size(&self) -> usize {
139        1 << self.sizeb
140    }
141}
142
143struct ChunkUnsafe {
144    file: ChunkFile,
145    map: HashMap<u32, Vec<OffsetAndSize>>, // <hash, [(OffsetAndSize), (OffsetAndSize)]>
146    holes: HashMap<u64, u8>, // <offset, sizeb>,
147    need_sync: bool
148}
149
150enum GetResult<T> {
151    Ok(T),
152    NotFound,
153    Expired,
154}
155
156impl ChunkUnsafe {
157
158    fn sync(&mut self) -> io::Result<()> {
159        if self.need_sync {
160            self.need_sync = false;
161            self.file.sync_all()?;
162        }
163
164        Ok(())
165    }
166
167    fn get(&self, key: &[u8], hash: u32) -> io::Result<GetResult<Vec<u8>>> {
168        if let Some(offset_and_size_vec) = self.map.get(&hash) {
169            for offset_and_size in offset_and_size_vec {
170                let offset = offset_and_size.offset;
171                let size = offset_and_size.size();
172                let mut packet = vec![0; size];
173                self.file.read_exact_at(&mut packet, io::SeekFrom::Start(offset))?;
174                let (packet_header, packet_key, packet_val) = packet_unmarshal(&packet);
175
176                if key.len() != packet_key.len() || key != packet_key {
177                    continue;
178                }
179
180                if packet_header.is_expired() {
181                    return Ok(GetResult::Expired);
182                }
183
184                return Ok(GetResult::Ok(packet_val.to_vec()));
185            }
186            Ok(GetResult::NotFound)
187        } else {
188            Ok(GetResult::NotFound)
189        }
190    }
191    
192    fn set(&mut self, key: &[u8], value: &[u8], hash: u32, ttl_sec: u32) -> io::Result<()> {
193        self.need_sync = true;
194        let expire = match ttl_sec {
195            0 => 0,      
196            _ => {
197                let ttl_duration = Duration::from_secs(ttl_sec as u64);
198                let ttl_since_the_epoch = SystemTime::now()
199                    .checked_add(ttl_duration).unwrap()
200                    .duration_since(UNIX_EPOCH).unwrap();
201                ttl_since_the_epoch.as_secs()
202            }
203        };
204        let (new_header, new_packet) = packet_marshal(key, value, expire)?;
205        let mut new_offset = 0;
206        let mut key_old_offset = 0;
207
208        if let Some(offset_and_size_vec) = self.map.get_mut(&hash) {
209            for offset_and_size in offset_and_size_vec {
210                let old_offset = offset_and_size.offset;
211                let old_size = offset_and_size.size();
212                let mut old_packet = vec![0; old_size];
213                self.file.read_exact_at(&mut old_packet, io::SeekFrom::Start(old_offset))?;
214                let (old_header, old_key, _) = packet_unmarshal(&old_packet);
215
216                if key.len() != old_key.len() || key != old_key {
217                    continue;
218                }
219
220                key_old_offset = old_offset;
221
222                if old_header.sizeb == new_header.sizeb {
223                    //overwrite
224                    new_offset = old_offset;
225                } else {
226                    //mark old as deleted
227                    let status_deleted = [STATUS_DELETED];
228                    self.file.write_at(&status_deleted, io::SeekFrom::Start(old_offset + 1))?;
229                    self.holes.insert(old_offset, old_header.sizeb);
230
231                    //try to find empty hole
232                    if let Some(hole_offset) = self.find_hole(new_header.sizeb) {
233                        new_offset = hole_offset;
234                        self.holes.remove(&hole_offset);
235                    }
236                }
237
238                break;
239            }
240        } else {
241            self.map.insert(hash, Vec::with_capacity(1));
242        }
243
244        let new_offset_seek = if new_offset == 0 {
245            io::SeekFrom::End(0)
246        } else {
247            io::SeekFrom::Start(new_offset)
248        };
249
250        new_offset = self.file.write_at(&new_packet, new_offset_seek)?;
251        let offset_and_size_vec = self.map.get_mut(&hash).unwrap();
252
253        if new_offset != key_old_offset {
254            if key_old_offset != 0 {
255                offset_and_size_vec.retain(|x| x.offset != key_old_offset);
256            }
257
258            offset_and_size_vec.push(OffsetAndSize { offset: new_offset, sizeb: new_header.sizeb });
259        }
260
261        Ok(())
262    }
263
264    fn delete (&mut self, key: &[u8], hash: u32) -> io::Result<bool> {
265        let mut key_packet_offset = 0;
266        let mut key_packet_sizeb = 0;
267
268        let mut checked_packet_keys = Vec::new();
269
270        if let Some(offset_and_size_vec) = self.map.get_mut(&hash) {
271            for offset_and_size in offset_and_size_vec {
272                let offset = offset_and_size.offset;
273                let size = offset_and_size.size();
274                let mut packet = vec![0; size];
275                self.file.read_exact_at(&mut packet, io::SeekFrom::Start(offset))?;
276                let (packet_header, packet_key, _) = packet_unmarshal(&packet);
277
278                if key.len() != packet_key.len() || key != packet_key {
279                    checked_packet_keys.push(packet_key.to_vec());
280                    continue;
281                }
282
283                key_packet_offset = offset;
284                key_packet_sizeb = packet_header.sizeb;
285
286                break;
287            }
288        }
289
290        if key_packet_offset != 0 && key_packet_sizeb != 0 {
291            let offset_and_size_vec = self.map.get_mut(&hash).unwrap();
292            offset_and_size_vec.retain(|x| x.offset != key_packet_offset);
293            let status_deleted = [STATUS_DELETED];
294            self.file.write_at(&status_deleted, io::SeekFrom::Start(key_packet_offset + 1))?;
295            self.holes.insert(key_packet_offset, key_packet_sizeb);
296            
297            Ok(true)
298        }
299        else {
300            Ok(false)
301        }
302    }
303
304    fn all_hash(&self) -> Vec<u32> {
305        let mut all_hash = Vec::with_capacity(self.map.len());
306
307        for (hash, _) in &self.map {
308            all_hash.push(*hash)
309        }
310
311        all_hash
312    }
313
314    fn delete_expired(&mut self, hash: u32) -> io::Result<()> {
315        let expired: io::Result<Vec<(u32, u64, u8)>> = {
316            let mut expired = Vec::new();
317            
318            if let Some(offset_and_size_vec) = self.map.get_mut(&hash) {
319                for offset_and_size in offset_and_size_vec {
320                    let offset = offset_and_size.offset;
321                    let sizeb = offset_and_size.sizeb;
322                    let mut header_buffer = [0u8; HEADER_SIZE];
323                    self.file.read_exact_at(&mut header_buffer, io::SeekFrom::Start(offset))?;
324                    let header = Header::parse(header_buffer);
325                    
326                    if header.is_deleted() {
327                        continue;
328                    }
329
330                    if header.is_expired() {
331                        expired.push((hash, offset, sizeb));
332                    }
333                }
334            }
335
336            Ok(expired)
337        };
338
339        for (hash, offset, sizeb) in expired? {
340            let status_deleted = [STATUS_DELETED];
341            self.file.write_at(&status_deleted, io::SeekFrom::Start(offset + 1))?;
342            let offset_and_size_vec = self.map.get_mut(&hash).unwrap();
343            offset_and_size_vec.retain(|x| x.offset != offset);
344            self.holes.insert(offset, sizeb);
345        }
346
347        Ok(())
348    }
349
350    fn find_hole(&self, sizeb: u8) -> Option<u64> {
351        let holes = &self.holes;
352
353        for (hole_offset, hole_sizeb) in holes {
354            if *hole_sizeb == sizeb {
355                return Some(*hole_offset);
356            }
357        }
358
359        None
360    }
361}
362
363fn packet_marshal(key: &[u8], value: &[u8], expire: u64) -> io::Result<(Header, Vec<u8>)> {
364    let header = Header::make(key, value, expire);
365
366    if header.sizeb > MAX_PACKET_SIZEB {
367        return Err(io::Error::new(io::ErrorKind::InvalidInput, "Exceeded the maximum package size".to_string()))
368    }
369
370    let size = header.size();
371    let mut buffer = vec![0; size];
372    header.write_to(&mut buffer[0..size]);
373    buffer[HEADER_SIZE..HEADER_SIZE + (header.key_len as usize)].copy_from_slice(key);
374    buffer[HEADER_SIZE + (header.key_len as usize)..HEADER_SIZE + (header.key_len as usize) + (header.val_len as usize)].copy_from_slice(value);
375
376    Ok((header, buffer))
377}
378
379fn packet_unmarshal(packet: &[u8]) -> (Header, &[u8], &[u8]) {
380    let mut header_buffer = [0u8; HEADER_SIZE];
381    header_buffer.copy_from_slice(&packet[0..HEADER_SIZE]);
382    let header = Header::parse(header_buffer);
383    let key = &packet[HEADER_SIZE..HEADER_SIZE + (header.key_len as usize)];
384    let value = &packet[HEADER_SIZE + (header.key_len as usize)..HEADER_SIZE + (header.key_len as usize) + (header.val_len as usize)];
385
386    (header, key, value)
387}
388
389pub struct Chunk {
390    chunk_unsafe: RwLock<ChunkUnsafe>
391}
392
393impl Chunk {
394    pub fn init(file_path: &Path) -> io::Result<Chunk> {
395        let mut file = OpenOptions::new()
396                            .read(true)
397                            .write(true)
398                            .create(true)
399                            .open(file_path)?;
400        file.sync_all()?;
401        let mut map = HashMap::new();
402        let mut holes = HashMap::new();
403        let file_metadata = file.metadata()?;
404    
405        if file_metadata.len() == 0 {
406            file.write(&[VERSION_MARKER, CHUNK_VERSION])?;
407        }
408        else {
409            let chunk_version = read_chunk_version(&mut file)?;
410    
411            if chunk_version != 1 {
412                return Err(io::Error::new(io::ErrorKind::InvalidData, format!("Unknown chunk version = {} in file", chunk_version)))
413            }
414    
415            let mut current_offset = 2;
416    
417            loop {
418                let option_header = read_header(&mut file)?;
419    
420                match option_header {
421                    None => break,
422                    Some(header) => {
423                        // read key
424                        let key_len = header.key_len as usize;
425                        let mut key = vec![0; key_len];
426                        file.read_exact(&mut key)?;
427    
428                        // skip
429                        let size = header.size();
430                        let new_offset = file.seek(io::SeekFrom::Current(((size as u32) - (header.key_len as u32) - (HEADER_SIZE as u32)) as i64))?;
431    
432                        // map store
433                        if header.is_deleted() || header.is_expired() {
434                            holes.insert(current_offset, header.sizeb);
435                        } else {
436                            let hash = hashes::hash(&key);
437    
438                            if map.contains_key(&hash) == false {
439                                map.insert(hash, Vec::new());
440                            }
441    
442                            let offset_and_size_vec = map.get_mut(&hash).unwrap();
443                            offset_and_size_vec.push(OffsetAndSize { offset: current_offset, sizeb: header.sizeb });
444                        }
445                        
446                        current_offset = new_offset;
447                    },
448                }
449            }
450        }
451    
452        let chunk_file = ChunkFile {
453            file: Mutex::new(file)
454        };
455        let chunk_unsafe = ChunkUnsafe { 
456            file: chunk_file, 
457            map, 
458            holes, 
459            need_sync: false 
460        };
461        let chunk = Chunk { 
462            chunk_unsafe: RwLock::new(chunk_unsafe) 
463        };
464    
465        Ok(chunk)
466    }
467
468    pub fn sync(&self) -> io::Result<()> {
469        let mut chunk_unsafe = self.chunk_unsafe.write().unwrap();
470        chunk_unsafe.sync()?;
471        Ok(())
472    }
473
474    pub fn get(&self, key: &[u8], hash: u32) -> io::Result<Option<Vec<u8>>> {
475        let result = {
476            let chunk_unsafe_read = self.chunk_unsafe.read().unwrap();
477            chunk_unsafe_read.get(key, hash)?
478        };
479
480        match result {
481            GetResult::Ok(value) => Ok(Some(value)),
482            GetResult::NotFound => Ok(None),
483            GetResult::Expired => {
484                let mut chunk_unsafe_write = self.chunk_unsafe.write().unwrap();
485                chunk_unsafe_write.delete(key, hash)?;
486                Ok(None)
487            },
488        }
489    }
490
491    pub fn set(&self, key: &[u8], value: &[u8], hash: u32, ttl_sec: u32) -> io::Result<()> {
492        let mut chunk_unsafe = self.chunk_unsafe.write().unwrap();
493        chunk_unsafe.set(key, value, hash, ttl_sec)
494    }
495
496    pub fn delete (&self, key: &[u8], hash: u32) -> io::Result<bool> {
497        let mut chunk_unsafe = self.chunk_unsafe.write().unwrap();
498        chunk_unsafe.delete(key, hash)
499    }
500
501    pub fn delete_expired(&self) -> io::Result<()> {
502        let all_hash = {
503            let chunk_unsafe_read = self.chunk_unsafe.read().unwrap();
504            chunk_unsafe_read.all_hash()
505        };
506
507        for hash in all_hash {
508            let mut chunk_unsafe = self.chunk_unsafe.write().unwrap();
509            chunk_unsafe.delete_expired(hash)?;
510        }
511
512        Ok(())
513    }
514}
515
516fn read_chunk_version(file: &mut File) -> io::Result<u8> {
517    let mut chunk_version_buffer = [0, 0];
518    file.read_exact(&mut chunk_version_buffer)?;
519
520    if chunk_version_buffer[0] == VERSION_MARKER {
521        Ok(chunk_version_buffer[1])
522    }
523    else {
524        Ok(0)
525    }
526}