Skip to main content

nydus_storage/cache/state/
range_map.rs

1// Copyright (C) 2021 Alibaba Cloud. All rights reserved.
2//
3// SPDX-License-Identifier: Apache-2.0
4
5use std::io::Result;
6
7use crate::cache::state::persist_map::PersistMap;
8use crate::cache::state::RangeMap;
9
10/// The name suffix of blob chunk_map file, named $blob_id.chunk_map.
11const FILE_SUFFIX: &str = "range_map";
12
13/// An implementation of [RangeMap] to support cache state tracking by using a bitmap file.
14///
15/// The `BlobRangeMap` is an implementation of [RangeMap] which uses a bitmap file and atomic
16/// bitmap operations to track readiness state. It creates or opens a file with the name
17/// `$blob_id.range_map` to record whether a data range has been cached by the blob cache, and
18/// atomic bitmap operations are used to manipulate the state bit. The bitmap file will be persisted
19/// to disk.
20pub struct BlobRangeMap {
21    pub(crate) shift: u32,
22    map: PersistMap,
23}
24
25impl BlobRangeMap {
26    /// Create a new instance of `BlobRangeMap`.
27    pub fn new(blob_path: &str, count: u32, shift: u32) -> Result<Self> {
28        let filename = format!("{}.{}", blob_path, FILE_SUFFIX);
29        debug_assert!(shift < 64);
30
31        PersistMap::open(&filename, count, true, true).map(|map| BlobRangeMap { shift, map })
32    }
33
34    /// Create a new instance of `BlobRangeMap` from an existing chunk map file.
35    pub fn open(blob_id: &str, workdir: &str, count: u32, shift: u32) -> Result<Self> {
36        let filename = format!("{}/{}.{}", workdir, blob_id, FILE_SUFFIX);
37        debug_assert!(shift < 64);
38
39        PersistMap::open(&filename, count, false, true).map(|map| BlobRangeMap { shift, map })
40    }
41
42    pub(crate) fn get_range(&self, start: u64, count: u64) -> Result<(u32, u32)> {
43        if let Some(end) = start.checked_add(count) {
44            let start_index = start >> self.shift as u64;
45            let end_index = (end - 1) >> self.shift as u64;
46            if start_index > u32::MAX as u64 || end_index > u32::MAX as u64 {
47                Err(einval!())
48            } else {
49                self.map.validate_index(start_index as u32)?;
50                self.map.validate_index(end_index as u32)?;
51                Ok((start_index as u32, end_index as u32 + 1))
52            }
53        } else {
54            Err(einval!())
55        }
56    }
57}
58
59impl RangeMap for BlobRangeMap {
60    type I = u64;
61
62    fn is_range_all_ready(&self) -> bool {
63        self.map.is_range_all_ready()
64    }
65
66    /// Check whether all data in the range are ready for use.
67    fn is_range_ready(&self, start: u64, count: u64) -> Result<bool> {
68        if !self.is_range_all_ready() {
69            let (start_index, end_index) = self.get_range(start, count)?;
70            for index in start_index..end_index {
71                if !self.map.is_chunk_ready(index).0 {
72                    return Ok(false);
73                }
74            }
75        }
76
77        Ok(true)
78    }
79
80    fn check_range_ready_and_mark_pending(
81        &self,
82        start: u64,
83        count: u64,
84    ) -> Result<Option<Vec<u64>>> {
85        if self.is_range_all_ready() {
86            Ok(None)
87        } else {
88            let (start_index, end_index) = self.get_range(start, count)?;
89            let mut vec = Vec::with_capacity(count as usize);
90
91            for index in start_index..end_index {
92                if !self.map.is_chunk_ready(index).0 {
93                    vec.push((index as u64) << self.shift);
94                }
95            }
96
97            if vec.is_empty() {
98                Ok(None)
99            } else {
100                Ok(Some(vec))
101            }
102        }
103    }
104
105    fn set_range_ready_and_clear_pending(&self, start: u64, count: u64) -> Result<()> {
106        if !self.is_range_all_ready() {
107            let (start_index, end_index) = self.get_range(start, count)?;
108
109            for index in start_index..end_index {
110                self.map.set_chunk_ready(index)?;
111            }
112        }
113
114        Ok(())
115    }
116}
117
118#[cfg(test)]
119mod tests {
120    use std::sync::Arc;
121    use std::thread;
122    use std::time::Instant;
123
124    use vmm_sys_util::tempdir::TempDir;
125
126    use super::super::BlobStateMap;
127    use super::*;
128
129    #[test]
130    fn test_range_map() {
131        let dir = TempDir::new().unwrap();
132        let blob_path = dir.as_path().join("blob-1");
133        let blob_path = blob_path.as_os_str().to_str().unwrap().to_string();
134        let range_count = 1000000;
135        let skip_index = 77;
136
137        let map1 = Arc::new(BlobStateMap::from_range_map(
138            BlobRangeMap::new(&blob_path, range_count, 12).unwrap(),
139        ));
140        let map2 = Arc::new(BlobStateMap::from_range_map(
141            BlobRangeMap::new(&blob_path, range_count, 12).unwrap(),
142        ));
143        let map3 = Arc::new(BlobStateMap::from_range_map(
144            BlobRangeMap::new(&blob_path, range_count, 12).unwrap(),
145        ));
146
147        let now = Instant::now();
148
149        let h1 = thread::spawn(move || {
150            for idx in 0..range_count {
151                if idx % skip_index != 0 {
152                    let addr = ((idx as u64) << 12) + (idx as u64 % 0x1000);
153                    map1.set_range_ready_and_clear_pending(addr, 1).unwrap();
154                }
155            }
156        });
157
158        let h2 = thread::spawn(move || {
159            for idx in 0..range_count {
160                if idx % skip_index != 0 {
161                    let addr = ((idx as u64) << 12) + (idx as u64 % 0x1000);
162                    map2.set_range_ready_and_clear_pending(addr, 1).unwrap();
163                }
164            }
165        });
166
167        h1.join()
168            .map_err(|e| {
169                error!("Join error {:?}", e);
170                e
171            })
172            .unwrap();
173        h2.join()
174            .map_err(|e| {
175                error!("Join error {:?}", e);
176                e
177            })
178            .unwrap();
179
180        println!("BlobRangeMap Concurrency: {}ms", now.elapsed().as_millis());
181
182        for idx in 0..range_count {
183            let addr = ((idx as u64) << 12) + (idx as u64 % 0x1000);
184
185            let is_ready = map3.is_range_ready(addr, 1).unwrap();
186            if idx % skip_index == 0 {
187                if is_ready {
188                    panic!("indexed chunk map: index {} shouldn't be ready", idx);
189                }
190            } else if !is_ready {
191                panic!("indexed chunk map: index {} should be ready", idx);
192            }
193        }
194    }
195
196    #[test]
197    fn test_range_map_state() {
198        let dir = TempDir::new().unwrap();
199        let blob_path = dir.as_path().join("blob-1");
200        let blob_path = blob_path.as_os_str().to_str().unwrap().to_string();
201        let range_count = 100;
202
203        let map = BlobRangeMap::new(&blob_path, range_count, 0).unwrap();
204        assert_eq!(
205            map.check_range_ready_and_mark_pending(1, 10)
206                .unwrap()
207                .unwrap()
208                .len(),
209            10
210        );
211        assert!(map.set_range_ready_and_clear_pending(1, 10).is_ok());
212        assert!(map
213            .check_range_ready_and_mark_pending(1, 10)
214            .unwrap()
215            .is_none());
216    }
217}