rustfs_filemeta/
fileinfo.rs

1// Copyright 2024 RustFS Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::error::{Error, Result};
16use crate::headers::RESERVED_METADATA_PREFIX_LOWER;
17use crate::headers::RUSTFS_HEALING;
18use bytes::Bytes;
19use rmp_serde::Serializer;
20use rustfs_utils::HashAlgorithm;
21use serde::Deserialize;
22use serde::Serialize;
23use std::collections::HashMap;
24use time::OffsetDateTime;
25use uuid::Uuid;
26
27pub const ERASURE_ALGORITHM: &str = "rs-vandermonde";
28pub const BLOCK_SIZE_V2: usize = 1024 * 1024; // 1M
29
30// Additional constants from Go version
31pub const NULL_VERSION_ID: &str = "null";
32// pub const RUSTFS_ERASURE_UPGRADED: &str = "x-rustfs-internal-erasure-upgraded";
33
34pub const TIER_FV_ID: &str = "tier-free-versionID";
35pub const TIER_FV_MARKER: &str = "tier-free-marker";
36pub const TIER_SKIP_FV_ID: &str = "tier-skip-fvid";
37
38#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)]
39pub struct ObjectPartInfo {
40    pub etag: String,
41    pub number: usize,
42    pub size: usize,
43    pub actual_size: i64, // Original data size
44    pub mod_time: Option<OffsetDateTime>,
45    // Index holds the index of the part in the erasure coding
46    pub index: Option<Bytes>,
47    // Checksums holds checksums of the part
48    pub checksums: Option<HashMap<String, String>>,
49}
50
51#[derive(Serialize, Deserialize, Debug, PartialEq, Default, Clone)]
52// ChecksumInfo - carries checksums of individual scattered parts per disk.
53pub struct ChecksumInfo {
54    pub part_number: usize,
55    pub algorithm: HashAlgorithm,
56    pub hash: Bytes,
57}
58
59#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Default, Clone)]
60pub enum ErasureAlgo {
61    #[default]
62    Invalid = 0,
63    ReedSolomon = 1,
64}
65
66impl ErasureAlgo {
67    pub fn valid(&self) -> bool {
68        *self > ErasureAlgo::Invalid
69    }
70    pub fn to_u8(&self) -> u8 {
71        match self {
72            ErasureAlgo::Invalid => 0,
73            ErasureAlgo::ReedSolomon => 1,
74        }
75    }
76
77    pub fn from_u8(u: u8) -> Self {
78        match u {
79            1 => ErasureAlgo::ReedSolomon,
80            _ => ErasureAlgo::Invalid,
81        }
82    }
83}
84
85impl std::fmt::Display for ErasureAlgo {
86    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87        match self {
88            ErasureAlgo::Invalid => write!(f, "Invalid"),
89            ErasureAlgo::ReedSolomon => write!(f, "{ERASURE_ALGORITHM}"),
90        }
91    }
92}
93
94#[derive(Serialize, Deserialize, Debug, PartialEq, Default, Clone)]
95// ErasureInfo holds erasure coding and bitrot related information.
96pub struct ErasureInfo {
97    // Algorithm is the String representation of erasure-coding-algorithm
98    pub algorithm: String,
99    // DataBlocks is the number of data blocks for erasure-coding
100    pub data_blocks: usize,
101    // ParityBlocks is the number of parity blocks for erasure-coding
102    pub parity_blocks: usize,
103    // BlockSize is the size of one erasure-coded block
104    pub block_size: usize,
105    // Index is the index of the current disk
106    pub index: usize,
107    // Distribution is the distribution of the data and parity blocks
108    pub distribution: Vec<usize>,
109    // Checksums holds all bitrot checksums of all erasure encoded blocks
110    pub checksums: Vec<ChecksumInfo>,
111}
112
113pub fn calc_shard_size(block_size: usize, data_shards: usize) -> usize {
114    (block_size.div_ceil(data_shards) + 1) & !1
115}
116
117impl ErasureInfo {
118    pub fn get_checksum_info(&self, part_number: usize) -> ChecksumInfo {
119        for sum in &self.checksums {
120            if sum.part_number == part_number {
121                return sum.clone();
122            }
123        }
124
125        ChecksumInfo {
126            algorithm: HashAlgorithm::HighwayHash256S,
127            ..Default::default()
128        }
129    }
130
131    /// Calculate the size of each shard.
132    pub fn shard_size(&self) -> usize {
133        calc_shard_size(self.block_size, self.data_blocks)
134    }
135    /// Calculate the total erasure file size for a given original size.
136    // Returns the final erasure size from the original size
137    pub fn shard_file_size(&self, total_length: i64) -> i64 {
138        if total_length == 0 {
139            return 0;
140        }
141
142        if total_length < 0 {
143            return total_length;
144        }
145
146        let total_length = total_length as usize;
147
148        let num_shards = total_length / self.block_size;
149        let last_block_size = total_length % self.block_size;
150        let last_shard_size = calc_shard_size(last_block_size, self.data_blocks);
151        (num_shards * self.shard_size() + last_shard_size) as i64
152    }
153
154    /// Check if this ErasureInfo equals another ErasureInfo
155    pub fn equals(&self, other: &ErasureInfo) -> bool {
156        self.algorithm == other.algorithm
157            && self.data_blocks == other.data_blocks
158            && self.parity_blocks == other.parity_blocks
159            && self.block_size == other.block_size
160            && self.index == other.index
161            && self.distribution == other.distribution
162    }
163}
164
165// #[derive(Debug, Clone)]
166#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)]
167pub struct FileInfo {
168    pub volume: String,
169    pub name: String,
170    pub version_id: Option<Uuid>,
171    pub is_latest: bool,
172    pub deleted: bool,
173    pub transition_status: String,
174    pub transitioned_objname: String,
175    pub transition_tier: String,
176    pub transition_version_id: Option<Uuid>,
177    pub expire_restored: bool,
178    pub data_dir: Option<Uuid>,
179    pub mod_time: Option<OffsetDateTime>,
180    pub size: i64,
181    // File mode bits
182    pub mode: Option<u32>,
183    // WrittenByVersion is the unix time stamp of the version that created this version of the object
184    pub written_by_version: Option<u64>,
185    pub metadata: HashMap<String, String>,
186    pub parts: Vec<ObjectPartInfo>,
187    pub erasure: ErasureInfo,
188    // MarkDeleted marks this version as deleted
189    pub mark_deleted: bool,
190    // ReplicationState - Internal replication state to be passed back in ObjectInfo
191    // pub replication_state: Option<ReplicationState>, // TODO: implement ReplicationState
192    pub data: Option<Bytes>,
193    pub num_versions: usize,
194    pub successor_mod_time: Option<OffsetDateTime>,
195    pub fresh: bool,
196    pub idx: usize,
197    // Combined checksum when object was uploaded
198    pub checksum: Option<Bytes>,
199    pub versioned: bool,
200}
201
202impl FileInfo {
203    pub fn new(object: &str, data_blocks: usize, parity_blocks: usize) -> Self {
204        let indexs = {
205            let cardinality = data_blocks + parity_blocks;
206            let mut nums = vec![0; cardinality];
207            let key_crc = crc32fast::hash(object.as_bytes());
208
209            let start = key_crc as usize % cardinality;
210            for i in 1..=cardinality {
211                nums[i - 1] = 1 + ((start + i) % cardinality);
212            }
213
214            nums
215        };
216        Self {
217            erasure: ErasureInfo {
218                algorithm: String::from(ERASURE_ALGORITHM),
219                data_blocks,
220                parity_blocks,
221                block_size: BLOCK_SIZE_V2,
222                distribution: indexs,
223                ..Default::default()
224            },
225            ..Default::default()
226        }
227    }
228
229    pub fn is_valid(&self) -> bool {
230        if self.deleted {
231            return true;
232        }
233
234        let data_blocks = self.erasure.data_blocks;
235        let parity_blocks = self.erasure.parity_blocks;
236
237        (data_blocks >= parity_blocks)
238            && (data_blocks > 0)
239            && (self.erasure.index > 0
240                && self.erasure.index <= data_blocks + parity_blocks
241                && self.erasure.distribution.len() == (data_blocks + parity_blocks))
242    }
243
244    pub fn get_etag(&self) -> Option<String> {
245        self.metadata.get("etag").cloned()
246    }
247
248    pub fn write_quorum(&self, quorum: usize) -> usize {
249        if self.deleted {
250            return quorum;
251        }
252
253        if self.erasure.data_blocks == self.erasure.parity_blocks {
254            return self.erasure.data_blocks + 1;
255        }
256
257        self.erasure.data_blocks
258    }
259
260    pub fn marshal_msg(&self) -> Result<Vec<u8>> {
261        let mut buf = Vec::new();
262
263        self.serialize(&mut Serializer::new(&mut buf))?;
264
265        Ok(buf)
266    }
267
268    pub fn unmarshal(buf: &[u8]) -> Result<Self> {
269        let t: FileInfo = rmp_serde::from_slice(buf)?;
270        Ok(t)
271    }
272
273    pub fn add_object_part(
274        &mut self,
275        num: usize,
276        etag: String,
277        part_size: usize,
278        mod_time: Option<OffsetDateTime>,
279        actual_size: i64,
280        index: Option<Bytes>,
281    ) {
282        let part = ObjectPartInfo {
283            etag,
284            number: num,
285            size: part_size,
286            mod_time,
287            actual_size,
288            index,
289            checksums: None,
290        };
291
292        for p in self.parts.iter_mut() {
293            if p.number == num {
294                *p = part;
295                return;
296            }
297        }
298
299        self.parts.push(part);
300
301        self.parts.sort_by(|a, b| a.number.cmp(&b.number));
302    }
303
304    // to_part_offset gets the part index where offset is located, returns part index and offset
305    pub fn to_part_offset(&self, offset: usize) -> Result<(usize, usize)> {
306        if offset == 0 {
307            return Ok((0, 0));
308        }
309
310        let mut part_offset = offset;
311        for (i, part) in self.parts.iter().enumerate() {
312            let part_index = i;
313            if part_offset < part.size {
314                return Ok((part_index, part_offset));
315            }
316
317            part_offset -= part.size
318        }
319
320        Err(Error::other("part not found"))
321    }
322
323    pub fn set_healing(&mut self) {
324        self.metadata.insert(RUSTFS_HEALING.to_string(), "true".to_string());
325    }
326
327    pub fn set_tier_free_version_id(&mut self, version_id: &str) {
328        self.metadata
329            .insert(format!("{RESERVED_METADATA_PREFIX_LOWER}{TIER_FV_ID}"), version_id.to_string());
330    }
331
332    pub fn tier_free_version_id(&self) -> String {
333        self.metadata[&format!("{RESERVED_METADATA_PREFIX_LOWER}{TIER_FV_ID}")].clone()
334    }
335
336    pub fn set_tier_free_version(&mut self) {
337        self.metadata
338            .insert(format!("{RESERVED_METADATA_PREFIX_LOWER}{TIER_FV_MARKER}"), "".to_string());
339    }
340
341    pub fn set_skip_tier_free_version(&mut self) {
342        self.metadata
343            .insert(format!("{RESERVED_METADATA_PREFIX_LOWER}{TIER_SKIP_FV_ID}"), "".to_string());
344    }
345
346    pub fn skip_tier_free_version(&self) -> bool {
347        self.metadata
348            .contains_key(&format!("{RESERVED_METADATA_PREFIX_LOWER}{TIER_SKIP_FV_ID}"))
349    }
350
351    pub fn tier_free_version(&self) -> bool {
352        self.metadata
353            .contains_key(&format!("{RESERVED_METADATA_PREFIX_LOWER}{TIER_FV_MARKER}"))
354    }
355
356    pub fn set_inline_data(&mut self) {
357        self.metadata
358            .insert(format!("{RESERVED_METADATA_PREFIX_LOWER}inline-data").to_owned(), "true".to_owned());
359    }
360
361    pub fn set_data_moved(&mut self) {
362        self.metadata
363            .insert(format!("{RESERVED_METADATA_PREFIX_LOWER}data-moved").to_owned(), "true".to_owned());
364    }
365
366    pub fn inline_data(&self) -> bool {
367        self.metadata
368            .contains_key(format!("{RESERVED_METADATA_PREFIX_LOWER}inline-data").as_str())
369            && !self.is_remote()
370    }
371
372    /// Check if the object is compressed
373    pub fn is_compressed(&self) -> bool {
374        self.metadata
375            .contains_key(&format!("{RESERVED_METADATA_PREFIX_LOWER}compression"))
376    }
377
378    /// Check if the object is remote (transitioned to another tier)
379    pub fn is_remote(&self) -> bool {
380        !self.transition_tier.is_empty()
381    }
382
383    /// Get the data directory for this object
384    pub fn get_data_dir(&self) -> String {
385        if self.deleted {
386            return "delete-marker".to_string();
387        }
388        self.data_dir.map_or("".to_string(), |dir| dir.to_string())
389    }
390
391    /// Read quorum returns expected read quorum for this FileInfo
392    pub fn read_quorum(&self, dquorum: usize) -> usize {
393        if self.deleted {
394            return dquorum;
395        }
396        self.erasure.data_blocks
397    }
398
399    /// Create a shallow copy with minimal information for READ MRF checks
400    pub fn shallow_copy(&self) -> Self {
401        Self {
402            volume: self.volume.clone(),
403            name: self.name.clone(),
404            version_id: self.version_id,
405            deleted: self.deleted,
406            erasure: self.erasure.clone(),
407            ..Default::default()
408        }
409    }
410
411    /// Check if this FileInfo equals another FileInfo
412    pub fn equals(&self, other: &FileInfo) -> bool {
413        // Check if both are compressed or both are not compressed
414        if self.is_compressed() != other.is_compressed() {
415            return false;
416        }
417
418        // Check transition info
419        if !self.transition_info_equals(other) {
420            return false;
421        }
422
423        // Check mod time
424        if self.mod_time != other.mod_time {
425            return false;
426        }
427
428        // Check erasure info
429        self.erasure.equals(&other.erasure)
430    }
431
432    /// Check if transition related information are equal
433    pub fn transition_info_equals(&self, other: &FileInfo) -> bool {
434        self.transition_status == other.transition_status
435            && self.transition_tier == other.transition_tier
436            && self.transitioned_objname == other.transitioned_objname
437            && self.transition_version_id == other.transition_version_id
438    }
439
440    /// Check if metadata maps are equal
441    pub fn metadata_equals(&self, other: &FileInfo) -> bool {
442        if self.metadata.len() != other.metadata.len() {
443            return false;
444        }
445        for (k, v) in &self.metadata {
446            if other.metadata.get(k) != Some(v) {
447                return false;
448            }
449        }
450        true
451    }
452
453    /// Check if replication related fields are equal
454    pub fn replication_info_equals(&self, other: &FileInfo) -> bool {
455        self.mark_deleted == other.mark_deleted
456        // TODO: Add replication_state comparison when implemented
457        // && self.replication_state == other.replication_state
458    }
459}
460
461#[derive(Debug, Default, Clone, Serialize, Deserialize)]
462pub struct FileInfoVersions {
463    // Name of the volume.
464    pub volume: String,
465
466    // Name of the file.
467    pub name: String,
468
469    // Represents the latest mod time of the
470    // latest version.
471    pub latest_mod_time: Option<OffsetDateTime>,
472
473    pub versions: Vec<FileInfo>,
474    pub free_versions: Vec<FileInfo>,
475}
476
477impl FileInfoVersions {
478    pub fn find_version_index(&self, vid: Uuid) -> Option<usize> {
479        self.versions.iter().position(|v| v.version_id == Some(vid))
480    }
481
482    /// Calculate the total size of all versions for this object
483    pub fn size(&self) -> i64 {
484        self.versions.iter().map(|v| v.size).sum()
485    }
486}
487
488#[derive(Default, Serialize, Deserialize)]
489pub struct RawFileInfo {
490    pub buf: Vec<u8>,
491}
492
493#[derive(Debug, Default, Clone, Serialize, Deserialize)]
494pub struct FilesInfo {
495    pub files: Vec<FileInfo>,
496    pub is_truncated: bool,
497}