rustfs_rio/
compress_index.rs

1// Copyright 2024 RustFS Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use bytes::Bytes;
16use serde::{Deserialize, Serialize};
17use std::io::{self, Read, Seek, SeekFrom};
18
19const S2_INDEX_HEADER: &[u8] = b"s2idx\x00";
20const S2_INDEX_TRAILER: &[u8] = b"\x00xdi2s";
21const MAX_INDEX_ENTRIES: usize = 1 << 16;
22const MIN_INDEX_DIST: i64 = 1 << 20;
23// const MIN_INDEX_DIST: i64 = 0;
24
25pub trait TryGetIndex {
26    fn try_get_index(&self) -> Option<&Index> {
27        None
28    }
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct Index {
33    pub total_uncompressed: i64,
34    pub total_compressed: i64,
35    info: Vec<IndexInfo>,
36    est_block_uncomp: i64,
37}
38
39impl Default for Index {
40    fn default() -> Self {
41        Self::new()
42    }
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct IndexInfo {
47    pub compressed_offset: i64,
48    pub uncompressed_offset: i64,
49}
50
51#[allow(dead_code)]
52impl Index {
53    pub fn new() -> Self {
54        Self {
55            total_uncompressed: -1,
56            total_compressed: -1,
57            info: Vec::new(),
58            est_block_uncomp: 0,
59        }
60    }
61
62    #[allow(dead_code)]
63    fn reset(&mut self, max_block: usize) {
64        self.est_block_uncomp = max_block as i64;
65        self.total_compressed = -1;
66        self.total_uncompressed = -1;
67        self.info.clear();
68    }
69
70    pub fn len(&self) -> usize {
71        self.info.len()
72    }
73
74    fn alloc_infos(&mut self, n: usize) {
75        if n > MAX_INDEX_ENTRIES {
76            panic!("n > MAX_INDEX_ENTRIES");
77        }
78        self.info = Vec::with_capacity(n);
79    }
80
81    pub fn add(&mut self, compressed_offset: i64, uncompressed_offset: i64) -> io::Result<()> {
82        if self.info.is_empty() {
83            self.info.push(IndexInfo {
84                compressed_offset,
85                uncompressed_offset,
86            });
87            return Ok(());
88        }
89
90        let last_idx = self.info.len() - 1;
91        let latest = &mut self.info[last_idx];
92
93        if latest.uncompressed_offset == uncompressed_offset {
94            latest.compressed_offset = compressed_offset;
95            return Ok(());
96        }
97
98        if latest.uncompressed_offset > uncompressed_offset {
99            return Err(io::Error::new(
100                io::ErrorKind::InvalidData,
101                format!(
102                    "internal error: Earlier uncompressed received ({} > {})",
103                    latest.uncompressed_offset, uncompressed_offset
104                ),
105            ));
106        }
107
108        if latest.compressed_offset > compressed_offset {
109            return Err(io::Error::new(
110                io::ErrorKind::InvalidData,
111                format!(
112                    "internal error: Earlier compressed received ({} > {})",
113                    latest.uncompressed_offset, uncompressed_offset
114                ),
115            ));
116        }
117
118        if latest.uncompressed_offset + MIN_INDEX_DIST > uncompressed_offset {
119            return Ok(());
120        }
121
122        self.info.push(IndexInfo {
123            compressed_offset,
124            uncompressed_offset,
125        });
126
127        self.total_compressed = compressed_offset;
128        self.total_uncompressed = uncompressed_offset;
129        Ok(())
130    }
131
132    pub fn find(&self, offset: i64) -> io::Result<(i64, i64)> {
133        if self.total_uncompressed < 0 {
134            return Err(io::Error::other("corrupt index"));
135        }
136
137        let mut offset = offset;
138        if offset < 0 {
139            offset += self.total_uncompressed;
140            if offset < 0 {
141                return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "offset out of bounds"));
142            }
143        }
144
145        if offset > self.total_uncompressed {
146            return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "offset out of bounds"));
147        }
148
149        if self.info.is_empty() {
150            return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "empty index"));
151        }
152
153        if self.info.len() > 200 {
154            let n = self
155                .info
156                .binary_search_by(|info| {
157                    if info.uncompressed_offset > offset {
158                        std::cmp::Ordering::Greater
159                    } else {
160                        std::cmp::Ordering::Less
161                    }
162                })
163                .unwrap_or_else(|i| i);
164
165            if n == 0 {
166                return Ok((self.info[0].compressed_offset, self.info[0].uncompressed_offset));
167            }
168            return Ok((self.info[n - 1].compressed_offset, self.info[n - 1].uncompressed_offset));
169        }
170
171        let mut compressed_off = 0;
172        let mut uncompressed_off = 0;
173        for info in &self.info {
174            if info.uncompressed_offset > offset {
175                break;
176            }
177            compressed_off = info.compressed_offset;
178            uncompressed_off = info.uncompressed_offset;
179        }
180        Ok((compressed_off, uncompressed_off))
181    }
182
183    fn reduce(&mut self) {
184        if self.info.len() < MAX_INDEX_ENTRIES && self.est_block_uncomp >= MIN_INDEX_DIST {
185            return;
186        }
187
188        let mut remove_n = (self.info.len() + 1) / MAX_INDEX_ENTRIES;
189        let src = self.info.clone();
190        let mut j = 0;
191
192        while self.est_block_uncomp * (remove_n as i64 + 1) < MIN_INDEX_DIST && self.info.len() / (remove_n + 1) > 1000 {
193            remove_n += 1;
194        }
195
196        let mut idx = 0;
197        while idx < src.len() {
198            self.info[j] = src[idx].clone();
199            j += 1;
200            idx += remove_n + 1;
201        }
202        self.info.truncate(j);
203        self.est_block_uncomp += self.est_block_uncomp * remove_n as i64;
204    }
205
206    pub fn into_vec(mut self) -> Bytes {
207        let mut b = Vec::new();
208        self.append_to(&mut b, self.total_uncompressed, self.total_compressed);
209        Bytes::from(b)
210    }
211
212    pub fn append_to(&mut self, b: &mut Vec<u8>, uncomp_total: i64, comp_total: i64) {
213        self.reduce();
214        let init_size = b.len();
215
216        // Add skippable header
217        b.extend_from_slice(&[0x50, 0x2A, 0x4D, 0x18]); // ChunkTypeIndex
218        b.extend_from_slice(&[0, 0, 0]); // Placeholder for chunk length
219
220        // Add header
221        b.extend_from_slice(S2_INDEX_HEADER);
222
223        // Add total sizes
224        let mut tmp = [0u8; 8];
225        let n = write_varint(&mut tmp, uncomp_total);
226        b.extend_from_slice(&tmp[..n]);
227        let n = write_varint(&mut tmp, comp_total);
228        b.extend_from_slice(&tmp[..n]);
229        let n = write_varint(&mut tmp, self.est_block_uncomp);
230        b.extend_from_slice(&tmp[..n]);
231        let n = write_varint(&mut tmp, self.info.len() as i64);
232        b.extend_from_slice(&tmp[..n]);
233
234        // Check if we should add uncompressed offsets
235        let mut has_uncompressed = 0u8;
236        for (idx, info) in self.info.iter().enumerate() {
237            if idx == 0 {
238                if info.uncompressed_offset != 0 {
239                    has_uncompressed = 1;
240                    break;
241                }
242                continue;
243            }
244            if info.uncompressed_offset != self.info[idx - 1].uncompressed_offset + self.est_block_uncomp {
245                has_uncompressed = 1;
246                break;
247            }
248        }
249        b.push(has_uncompressed);
250
251        // Add uncompressed offsets if needed
252        if has_uncompressed == 1 {
253            for (idx, info) in self.info.iter().enumerate() {
254                let mut u_off = info.uncompressed_offset;
255                if idx > 0 {
256                    let prev = &self.info[idx - 1];
257                    u_off -= prev.uncompressed_offset + self.est_block_uncomp;
258                }
259                let n = write_varint(&mut tmp, u_off);
260                b.extend_from_slice(&tmp[..n]);
261            }
262        }
263
264        // Add compressed offsets
265        let mut c_predict = self.est_block_uncomp / 2;
266        for (idx, info) in self.info.iter().enumerate() {
267            let mut c_off = info.compressed_offset;
268            if idx > 0 {
269                let prev = &self.info[idx - 1];
270                c_off -= prev.compressed_offset + c_predict;
271                c_predict += c_off / 2;
272            }
273            let n = write_varint(&mut tmp, c_off);
274            b.extend_from_slice(&tmp[..n]);
275        }
276
277        // Add total size and trailer
278        let total_size = (b.len() - init_size + 4 + S2_INDEX_TRAILER.len()) as u32;
279        b.extend_from_slice(&total_size.to_le_bytes());
280        b.extend_from_slice(S2_INDEX_TRAILER);
281
282        // Update chunk length
283        let chunk_len = b.len() - init_size - 4;
284        b[init_size + 1] = chunk_len as u8;
285        b[init_size + 2] = (chunk_len >> 8) as u8;
286        b[init_size + 3] = (chunk_len >> 16) as u8;
287    }
288
289    pub fn load<'a>(&mut self, mut b: &'a [u8]) -> io::Result<&'a [u8]> {
290        if b.len() <= 4 + S2_INDEX_HEADER.len() + S2_INDEX_TRAILER.len() {
291            return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "buffer too small"));
292        }
293
294        if b[0] != 0x50 || b[1] != 0x2A || b[2] != 0x4D || b[3] != 0x18 {
295            return Err(io::Error::other("invalid chunk type"));
296        }
297
298        let chunk_len = (b[1] as usize) | ((b[2] as usize) << 8) | ((b[3] as usize) << 16);
299        b = &b[4..];
300
301        if b.len() < chunk_len {
302            return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "buffer too small"));
303        }
304
305        if !b.starts_with(S2_INDEX_HEADER) {
306            return Err(io::Error::other("invalid header"));
307        }
308        b = &b[S2_INDEX_HEADER.len()..];
309
310        // Read total uncompressed
311        let (v, n) = read_varint(b)?;
312        if v < 0 {
313            return Err(io::Error::other("invalid uncompressed size"));
314        }
315        self.total_uncompressed = v;
316        b = &b[n..];
317
318        // Read total compressed
319        let (v, n) = read_varint(b)?;
320        if v < 0 {
321            return Err(io::Error::other("invalid compressed size"));
322        }
323        self.total_compressed = v;
324        b = &b[n..];
325
326        // Read est block uncomp
327        let (v, n) = read_varint(b)?;
328        if v < 0 {
329            return Err(io::Error::other("invalid block size"));
330        }
331        self.est_block_uncomp = v;
332        b = &b[n..];
333
334        // Read number of entries
335        let (v, n) = read_varint(b)?;
336        if v < 0 || v > MAX_INDEX_ENTRIES as i64 {
337            return Err(io::Error::other("invalid number of entries"));
338        }
339        let entries = v as usize;
340        b = &b[n..];
341
342        self.alloc_infos(entries);
343
344        if b.is_empty() {
345            return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "buffer too small"));
346        }
347
348        let has_uncompressed = b[0];
349        b = &b[1..];
350
351        if has_uncompressed & 1 != has_uncompressed {
352            return Err(io::Error::other("invalid uncompressed flag"));
353        }
354
355        // Read uncompressed offsets
356        for idx in 0..entries {
357            let mut u_off = 0i64;
358            if has_uncompressed != 0 {
359                let (v, n) = read_varint(b)?;
360                u_off = v;
361                b = &b[n..];
362            }
363
364            if idx > 0 {
365                let prev = self.info[idx - 1].uncompressed_offset;
366                u_off += prev + self.est_block_uncomp;
367                if u_off <= prev {
368                    return Err(io::Error::other("invalid offset"));
369                }
370            }
371            if u_off < 0 {
372                return Err(io::Error::other("negative offset"));
373            }
374            self.info[idx].uncompressed_offset = u_off;
375        }
376
377        // Read compressed offsets
378        let mut c_predict = self.est_block_uncomp / 2;
379        for idx in 0..entries {
380            let (v, n) = read_varint(b)?;
381            let mut c_off = v;
382            b = &b[n..];
383
384            if idx > 0 {
385                c_predict += c_off / 2;
386                let prev = self.info[idx - 1].compressed_offset;
387                c_off += prev + c_predict;
388                if c_off <= prev {
389                    return Err(io::Error::other("invalid offset"));
390                }
391            }
392            if c_off < 0 {
393                return Err(io::Error::other("negative offset"));
394            }
395            self.info[idx].compressed_offset = c_off;
396        }
397
398        if b.len() < 4 + S2_INDEX_TRAILER.len() {
399            return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "buffer too small"));
400        }
401
402        // Skip size
403        b = &b[4..];
404
405        // Check trailer
406        if !b.starts_with(S2_INDEX_TRAILER) {
407            return Err(io::Error::other("invalid trailer"));
408        }
409
410        Ok(&b[S2_INDEX_TRAILER.len()..])
411    }
412
413    pub fn load_stream<R: Read + Seek>(&mut self, mut rs: R) -> io::Result<()> {
414        // Go to end
415        rs.seek(SeekFrom::End(-10))?;
416        let mut tmp = [0u8; 10];
417        rs.read_exact(&mut tmp)?;
418
419        // Check trailer
420        if &tmp[4..4 + S2_INDEX_TRAILER.len()] != S2_INDEX_TRAILER {
421            return Err(io::Error::other("invalid trailer"));
422        }
423
424        let sz = u32::from_le_bytes(tmp[..4].try_into().unwrap());
425        if sz > 0x7fffffff {
426            return Err(io::Error::other("size too large"));
427        }
428
429        rs.seek(SeekFrom::End(-(sz as i64)))?;
430
431        let mut buf = vec![0u8; sz as usize];
432        rs.read_exact(&mut buf)?;
433
434        self.load(&buf)?;
435        Ok(())
436    }
437
438    pub fn to_json(&self) -> serde_json::Result<Vec<u8>> {
439        #[derive(Serialize)]
440        struct Offset {
441            compressed: i64,
442            uncompressed: i64,
443        }
444
445        #[derive(Serialize)]
446        struct IndexJson {
447            total_uncompressed: i64,
448            total_compressed: i64,
449            offsets: Vec<Offset>,
450            est_block_uncompressed: i64,
451        }
452
453        let json = IndexJson {
454            total_uncompressed: self.total_uncompressed,
455            total_compressed: self.total_compressed,
456            offsets: self
457                .info
458                .iter()
459                .map(|info| Offset {
460                    compressed: info.compressed_offset,
461                    uncompressed: info.uncompressed_offset,
462                })
463                .collect(),
464            est_block_uncompressed: self.est_block_uncomp,
465        };
466
467        serde_json::to_vec_pretty(&json)
468    }
469}
470
471// Helper functions for varint encoding/decoding
472fn write_varint(buf: &mut [u8], mut v: i64) -> usize {
473    let mut n = 0;
474    while v >= 0x80 {
475        buf[n] = (v as u8) | 0x80;
476        v >>= 7;
477        n += 1;
478    }
479    buf[n] = v as u8;
480    n + 1
481}
482
483fn read_varint(buf: &[u8]) -> io::Result<(i64, usize)> {
484    let mut result = 0i64;
485    let mut shift = 0;
486    let mut n = 0;
487
488    while n < buf.len() {
489        let byte = buf[n];
490        n += 1;
491        result |= ((byte & 0x7F) as i64) << shift;
492        if byte < 0x80 {
493            return Ok((result, n));
494        }
495        shift += 7;
496    }
497
498    Err(io::Error::new(io::ErrorKind::UnexpectedEof, "unexpected EOF"))
499}
500
501// Helper functions for index header manipulation
502#[allow(dead_code)]
503pub fn remove_index_headers(b: &[u8]) -> Option<&[u8]> {
504    if b.len() < 4 + S2_INDEX_TRAILER.len() {
505        return None;
506    }
507
508    // Skip size
509    let b = &b[4..];
510
511    // Check trailer
512    if !b.starts_with(S2_INDEX_TRAILER) {
513        return None;
514    }
515
516    Some(&b[S2_INDEX_TRAILER.len()..])
517}
518
519#[allow(dead_code)]
520pub fn restore_index_headers(in_data: &[u8]) -> Vec<u8> {
521    if in_data.is_empty() {
522        return Vec::new();
523    }
524
525    let mut b = Vec::with_capacity(4 + S2_INDEX_HEADER.len() + in_data.len() + S2_INDEX_TRAILER.len() + 4);
526    b.extend_from_slice(&[0x50, 0x2A, 0x4D, 0x18]);
527    b.extend_from_slice(S2_INDEX_HEADER);
528    b.extend_from_slice(in_data);
529
530    let total_size = (b.len() + 4 + S2_INDEX_TRAILER.len()) as u32;
531    b.extend_from_slice(&total_size.to_le_bytes());
532    b.extend_from_slice(S2_INDEX_TRAILER);
533
534    let chunk_len = b.len() - 4;
535    b[1] = chunk_len as u8;
536    b[2] = (chunk_len >> 8) as u8;
537    b[3] = (chunk_len >> 16) as u8;
538
539    b
540}
541
542#[cfg(test)]
543mod tests {
544    use super::*;
545
546    #[test]
547    fn test_index_new() {
548        let index = Index::new();
549        assert_eq!(index.total_uncompressed, -1);
550        assert_eq!(index.total_compressed, -1);
551        assert!(index.info.is_empty());
552        assert_eq!(index.est_block_uncomp, 0);
553    }
554
555    #[test]
556    fn test_index_add() -> io::Result<()> {
557        let mut index = Index::new();
558
559        // 测试添加第一个索引
560        index.add(100, 1000)?;
561        assert_eq!(index.info.len(), 1);
562        assert_eq!(index.info[0].compressed_offset, 100);
563        assert_eq!(index.info[0].uncompressed_offset, 1000);
564
565        // 测试添加相同未压缩偏移量的索引
566        index.add(200, 1000)?;
567        assert_eq!(index.info.len(), 1);
568        assert_eq!(index.info[0].compressed_offset, 200);
569        assert_eq!(index.info[0].uncompressed_offset, 1000);
570
571        // 测试添加新的索引(确保距离足够大)
572        index.add(300, 2000 + MIN_INDEX_DIST)?;
573        assert_eq!(index.info.len(), 2);
574        assert_eq!(index.info[1].compressed_offset, 300);
575        assert_eq!(index.info[1].uncompressed_offset, 2000 + MIN_INDEX_DIST);
576
577        Ok(())
578    }
579
580    #[test]
581    fn test_index_add_errors() {
582        let mut index = Index::new();
583
584        // 添加初始索引
585        index.add(100, 1000).unwrap();
586
587        // 测试添加更小的未压缩偏移量
588        let err = index.add(200, 500).unwrap_err();
589        assert_eq!(err.kind(), io::ErrorKind::InvalidData);
590
591        // 测试添加更小的压缩偏移量
592        let err = index.add(50, 2000).unwrap_err();
593        assert_eq!(err.kind(), io::ErrorKind::InvalidData);
594    }
595
596    #[test]
597    fn test_index_find() -> io::Result<()> {
598        let mut index = Index::new();
599        index.total_uncompressed = 1000 + MIN_INDEX_DIST * 3;
600        index.total_compressed = 5000;
601
602        // 添加一些测试数据,确保索引间距满足 MIN_INDEX_DIST 要求
603        index.add(100, 1000)?;
604        index.add(300, 1000 + MIN_INDEX_DIST)?;
605        index.add(500, 1000 + MIN_INDEX_DIST * 2)?;
606
607        // 测试查找存在的偏移量
608        let (comp, uncomp) = index.find(1500)?;
609        assert_eq!(comp, 100);
610        assert_eq!(uncomp, 1000);
611
612        // 测试查找边界值
613        let (comp, uncomp) = index.find(1000 + MIN_INDEX_DIST)?;
614        assert_eq!(comp, 300);
615        assert_eq!(uncomp, 1000 + MIN_INDEX_DIST);
616
617        // 测试查找最后一个索引
618        let (comp, uncomp) = index.find(1000 + MIN_INDEX_DIST * 2)?;
619        assert_eq!(comp, 500);
620        assert_eq!(uncomp, 1000 + MIN_INDEX_DIST * 2);
621
622        Ok(())
623    }
624
625    #[test]
626    fn test_index_find_errors() {
627        let mut index = Index::new();
628        index.total_uncompressed = 10000;
629        index.total_compressed = 5000;
630
631        // 测试未初始化的索引
632        let uninit_index = Index::new();
633        let err = uninit_index.find(1000).unwrap_err();
634        assert_eq!(err.kind(), io::ErrorKind::Other);
635
636        // 测试超出范围的偏移量
637        let err = index.find(15000).unwrap_err();
638        assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof);
639
640        // 测试负数偏移量
641        let err = match index.find(-1000) {
642            Ok(_) => panic!("should be error"),
643            Err(e) => e,
644        };
645        assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof);
646    }
647
648    #[test]
649    fn test_index_reduce() {
650        let mut index = Index::new();
651        index.est_block_uncomp = MIN_INDEX_DIST;
652
653        // 添加超过最大索引数量的条目,确保间距满足 MIN_INDEX_DIST 要求
654        for i in 0..MAX_INDEX_ENTRIES + 100 {
655            index.add(i as i64 * 100, i as i64 * MIN_INDEX_DIST).unwrap();
656        }
657
658        // 手动调用 reduce 方法
659        index.reduce();
660
661        // 验证索引数量是否被正确减少
662        assert!(index.info.len() <= MAX_INDEX_ENTRIES);
663    }
664
665    #[test]
666    fn test_index_json() -> io::Result<()> {
667        let mut index = Index::new();
668
669        // 添加一些测试数据
670        index.add(100, 1000)?;
671        index.add(300, 2000 + MIN_INDEX_DIST)?;
672
673        // 测试 JSON 序列化
674        let json = index.to_json().unwrap();
675        let json_str = String::from_utf8(json).unwrap();
676
677        println!("json_str: {json_str}");
678        // 验证 JSON 内容
679
680        assert!(json_str.contains("\"compressed\": 100"));
681        assert!(json_str.contains("\"uncompressed\": 1000"));
682        assert!(json_str.contains("\"est_block_uncompressed\": 0"));
683
684        Ok(())
685    }
686}