Skip to main content

nydus_storage/cache/state/
blob_state_map.rs

1// Copyright 2021 Ant Group. All rights reserved.
2// Copyright (C) 2021 Alibaba Cloud. All rights reserved.
3//
4// SPDX-License-Identifier: Apache-2.0
5
6use std::any::Any;
7use std::collections::HashMap;
8use std::fmt::Display;
9use std::hash::Hash;
10use std::io::Result;
11use std::sync::{Arc, Condvar, Mutex, WaitTimeoutResult};
12use std::time::Duration;
13
14use crate::cache::state::{BlobRangeMap, ChunkIndexGetter, ChunkMap, IndexedChunkMap, RangeMap};
15use crate::cache::SINGLE_INFLIGHT_WAIT_TIMEOUT;
16use crate::device::BlobChunkInfo;
17use crate::{StorageError, StorageResult};
18
19#[derive(PartialEq, Copy, Clone)]
20enum Status {
21    Inflight,
22    Complete,
23}
24
25struct Slot {
26    state: Mutex<Status>,
27    condvar: Condvar,
28}
29
30impl Slot {
31    fn new() -> Self {
32        Slot {
33            state: Mutex::new(Status::Inflight),
34            condvar: Condvar::new(),
35        }
36    }
37
38    fn notify(&self) {
39        self.condvar.notify_all();
40    }
41
42    fn done(&self) {
43        // Not expect poisoned lock here
44        *self.state.lock().unwrap() = Status::Complete;
45        self.notify();
46    }
47
48    fn wait_for_inflight(&self, timeout: Duration) -> StorageResult<Status> {
49        let mut state = self.state.lock().unwrap();
50        let mut tor: WaitTimeoutResult;
51
52        while *state == Status::Inflight {
53            // Do not expect poisoned lock, so unwrap here.
54            let r = self.condvar.wait_timeout(state, timeout).unwrap();
55            state = r.0;
56            tor = r.1;
57            if tor.timed_out() {
58                return Err(StorageError::Timeout);
59            }
60        }
61
62        Ok(*state)
63    }
64}
65
66/// Adapter structure to enable concurrent chunk readiness manipulating based on a base [ChunkMap]
67/// object.
68///
69/// A base [ChunkMap], such as [IndexedChunkMap](../chunk_indexed/struct.IndexedChunkMap.html), only
70/// tracks chunk readiness state, but doesn't support concurrent manipulating of the chunk readiness
71/// state. The `BlobStateMap` structure acts as an adapter to enable concurrent chunk readiness
72/// state manipulation.
73pub struct BlobStateMap<C, I> {
74    c: C,
75    inflight_tracer: Mutex<HashMap<I, Arc<Slot>>>,
76}
77
78impl<C, I> From<C> for BlobStateMap<C, I>
79where
80    C: ChunkMap + ChunkIndexGetter<Index = I>,
81    I: Eq + Hash + Display,
82{
83    fn from(c: C) -> Self {
84        Self {
85            c,
86            inflight_tracer: Mutex::new(HashMap::new()),
87        }
88    }
89}
90
91impl<C, I> ChunkMap for BlobStateMap<C, I>
92where
93    C: ChunkMap + ChunkIndexGetter<Index = I>,
94    I: Eq + Hash + Display + Send + 'static,
95{
96    fn is_ready(&self, chunk: &dyn BlobChunkInfo) -> Result<bool> {
97        self.c.is_ready(chunk)
98    }
99
100    fn is_pending(&self, chunk: &dyn BlobChunkInfo) -> Result<bool> {
101        let index = C::get_index(chunk);
102        Ok(self.inflight_tracer.lock().unwrap().get(&index).is_some())
103    }
104
105    fn check_ready_and_mark_pending(&self, chunk: &dyn BlobChunkInfo) -> StorageResult<bool> {
106        let mut ready = self.c.is_ready(chunk).map_err(StorageError::CacheIndex)?;
107
108        if ready {
109            return Ok(true);
110        }
111
112        let index = C::get_index(chunk);
113        let mut guard = self.inflight_tracer.lock().unwrap();
114
115        if let Some(i) = guard.get(&index).cloned() {
116            drop(guard);
117            let result = i.wait_for_inflight(Duration::from_millis(SINGLE_INFLIGHT_WAIT_TIMEOUT));
118            if let Err(StorageError::Timeout) = result {
119                warn!(
120                    "Waiting for backend IO expires. chunk index {}, compressed offset {}",
121                    index,
122                    chunk.compressed_offset()
123                );
124
125                Err(StorageError::Timeout)
126            } else {
127                // Check if the chunk is ready in local cache again. It should be READY
128                // since wait_for_inflight must return OK in this branch by one more check.
129                self.check_ready_and_mark_pending(chunk)
130            }
131        } else {
132            // Double check to close the window where prior slot was just removed after backend IO
133            // returned.
134            if self.c.is_ready(chunk).map_err(StorageError::CacheIndex)? {
135                ready = true;
136            } else {
137                guard.insert(index, Arc::new(Slot::new()));
138            }
139            Ok(ready)
140        }
141    }
142
143    fn set_ready_and_clear_pending(&self, chunk: &dyn BlobChunkInfo) -> Result<()> {
144        let res = self.c.set_ready_and_clear_pending(chunk);
145        self.clear_pending(chunk);
146        res
147    }
148
149    fn clear_pending(&self, chunk: &dyn BlobChunkInfo) {
150        let index = C::get_index(chunk);
151        let mut guard = self.inflight_tracer.lock().unwrap();
152        if let Some(i) = guard.remove(&index) {
153            i.done();
154        }
155    }
156
157    fn is_persist(&self) -> bool {
158        self.c.is_persist()
159    }
160
161    fn as_range_map(&self) -> Option<&dyn RangeMap<I = u32>> {
162        let any = self as &dyn Any;
163
164        any.downcast_ref::<BlobStateMap<IndexedChunkMap, u32>>()
165            .map(|v| v as &dyn RangeMap<I = u32>)
166    }
167}
168
169impl RangeMap for BlobStateMap<IndexedChunkMap, u32> {
170    type I = u32;
171
172    fn is_range_all_ready(&self) -> bool {
173        self.c.is_range_all_ready()
174    }
175
176    fn is_range_ready(&self, start: Self::I, count: Self::I) -> Result<bool> {
177        self.c.is_range_ready(start, count)
178    }
179
180    fn check_range_ready_and_mark_pending(
181        &self,
182        start: Self::I,
183        count: Self::I,
184    ) -> Result<Option<Vec<Self::I>>> {
185        let pending = match self.c.check_range_ready_and_mark_pending(start, count) {
186            Err(e) => return Err(e),
187            Ok(None) => return Ok(None),
188            Ok(Some(v)) => {
189                if v.is_empty() {
190                    return Ok(None);
191                }
192                v
193            }
194        };
195
196        let mut res = Vec::with_capacity(pending.len());
197        let mut guard = self.inflight_tracer.lock().unwrap();
198        for index in pending.iter() {
199            if guard.get(index).is_none() {
200                // Double check to close the window where prior slot was just removed after backend
201                // IO returned.
202                if !self.c.is_range_ready(*index, 1)? {
203                    guard.insert(*index, Arc::new(Slot::new()));
204                    res.push(*index);
205                }
206            }
207        }
208
209        Ok(Some(res))
210    }
211
212    fn set_range_ready_and_clear_pending(&self, start: Self::I, count: Self::I) -> Result<()> {
213        let res = self.c.set_range_ready_and_clear_pending(start, count);
214        self.clear_range_pending(start, count);
215        res
216    }
217
218    fn clear_range_pending(&self, start: Self::I, count: Self::I) {
219        let count = std::cmp::min(count, u32::MAX - start);
220        let end = start + count;
221        let mut guard = self.inflight_tracer.lock().unwrap();
222
223        for index in start..end {
224            if let Some(i) = guard.remove(&index) {
225                i.done();
226            }
227        }
228    }
229
230    fn wait_for_range_ready(&self, start: Self::I, count: Self::I) -> Result<bool> {
231        let count = std::cmp::min(count, u32::MAX - start);
232        let end = start + count;
233        if self.is_range_ready(start, count)? {
234            return Ok(true);
235        }
236
237        let mut guard = self.inflight_tracer.lock().unwrap();
238        for index in start..end {
239            if let Some(i) = guard.get(&index).cloned() {
240                drop(guard);
241                let result =
242                    i.wait_for_inflight(Duration::from_millis(SINGLE_INFLIGHT_WAIT_TIMEOUT));
243                if let Err(StorageError::Timeout) = result {
244                    warn!(
245                        "Waiting for range backend IO expires. chunk index {}. range[{}, {}]",
246                        index, start, count
247                    );
248                    break;
249                };
250                if !self.c.is_range_ready(index, 1)? {
251                    return Ok(false);
252                }
253                guard = self.inflight_tracer.lock().unwrap();
254            }
255        }
256
257        self.is_range_ready(start, count)
258    }
259}
260
261impl RangeMap for BlobStateMap<BlobRangeMap, u64> {
262    type I = u64;
263
264    fn is_range_all_ready(&self) -> bool {
265        self.c.is_range_all_ready()
266    }
267
268    fn is_range_ready(&self, start: Self::I, count: Self::I) -> Result<bool> {
269        self.c.is_range_ready(start, count)
270    }
271
272    fn check_range_ready_and_mark_pending(
273        &self,
274        start: Self::I,
275        count: Self::I,
276    ) -> Result<Option<Vec<Self::I>>> {
277        let pending = match self.c.check_range_ready_and_mark_pending(start, count) {
278            Err(e) => return Err(e),
279            Ok(None) => return Ok(None),
280            Ok(Some(v)) => {
281                if v.is_empty() {
282                    return Ok(None);
283                }
284                v
285            }
286        };
287
288        let mut res = Vec::with_capacity(pending.len());
289        let mut guard = self.inflight_tracer.lock().unwrap();
290        for index in pending.iter() {
291            if guard.get(index).is_none() {
292                // Double check to close the window where prior slot was just removed after backend
293                // IO returned.
294                if !self.c.is_range_ready(*index, 1)? {
295                    guard.insert(*index, Arc::new(Slot::new()));
296                    res.push(*index);
297                }
298            }
299        }
300
301        Ok(Some(res))
302    }
303
304    fn set_range_ready_and_clear_pending(&self, start: Self::I, count: Self::I) -> Result<()> {
305        let res = self.c.set_range_ready_and_clear_pending(start, count);
306        self.clear_range_pending(start, count);
307        res
308    }
309
310    fn clear_range_pending(&self, start: Self::I, count: Self::I) {
311        let (start_index, end_index) = match self.c.get_range(start, count) {
312            Ok(v) => v,
313            Err(_) => {
314                debug_assert!(false);
315                return;
316            }
317        };
318
319        let mut guard = self.inflight_tracer.lock().unwrap();
320        for index in start_index..end_index {
321            let idx = (index as u64) << self.c.shift;
322            if let Some(i) = guard.remove(&idx) {
323                i.done();
324            }
325        }
326    }
327
328    fn wait_for_range_ready(&self, start: Self::I, count: Self::I) -> Result<bool> {
329        if self.c.is_range_ready(start, count)? {
330            return Ok(true);
331        }
332
333        let (start_index, end_index) = self.c.get_range(start, count)?;
334        let mut guard = self.inflight_tracer.lock().unwrap();
335        for index in start_index..end_index {
336            let idx = (index as u64) << self.c.shift;
337            if let Some(i) = guard.get(&idx).cloned() {
338                drop(guard);
339                let result =
340                    i.wait_for_inflight(Duration::from_millis(SINGLE_INFLIGHT_WAIT_TIMEOUT));
341                if let Err(StorageError::Timeout) = result {
342                    warn!(
343                        "Waiting for range backend IO expires. chunk index {}. range[{}, {}]",
344                        index, start, count
345                    );
346                    break;
347                };
348                if !self.c.is_range_ready(idx, 1)? {
349                    return Ok(false);
350                }
351                guard = self.inflight_tracer.lock().unwrap();
352            }
353        }
354
355        self.c.is_range_ready(start, count)
356    }
357}
358
359impl BlobStateMap<BlobRangeMap, u64> {
360    /// Create a new instance of `BlobStateMap` from a `BlobRangeMap` object.
361    pub fn from_range_map(map: BlobRangeMap) -> Self {
362        Self {
363            c: map,
364            inflight_tracer: Mutex::new(HashMap::new()),
365        }
366    }
367}
368
369#[cfg(test)]
370pub(crate) mod tests {
371    use std::sync::Arc;
372    use std::thread;
373    use std::time::Instant;
374
375    use nydus_utils::digest::Algorithm::Blake3;
376    use nydus_utils::digest::{Algorithm, RafsDigest};
377    use vmm_sys_util::tempdir::TempDir;
378    use vmm_sys_util::tempfile::TempFile;
379
380    use super::*;
381    use crate::cache::state::DigestedChunkMap;
382    use crate::device::BlobChunkInfo;
383    use crate::test::MockChunkInfo;
384
385    struct Chunk {
386        index: u32,
387        digest: RafsDigest,
388    }
389
390    impl Chunk {
391        fn new(index: u32) -> Arc<Self> {
392            Arc::new(Self {
393                index,
394                digest: RafsDigest::from_buf(
395                    unsafe { std::slice::from_raw_parts(&index as *const u32 as *const u8, 4) },
396                    Algorithm::Blake3,
397                ),
398            })
399        }
400    }
401
402    impl BlobChunkInfo for Chunk {
403        fn chunk_id(&self) -> &RafsDigest {
404            &self.digest
405        }
406
407        fn id(&self) -> u32 {
408            self.index
409        }
410
411        fn blob_index(&self) -> u32 {
412            0
413        }
414
415        fn compressed_offset(&self) -> u64 {
416            unimplemented!();
417        }
418
419        fn compressed_size(&self) -> u32 {
420            unimplemented!();
421        }
422
423        fn uncompressed_offset(&self) -> u64 {
424            unimplemented!();
425        }
426
427        fn uncompressed_size(&self) -> u32 {
428            unimplemented!();
429        }
430
431        fn is_batch(&self) -> bool {
432            unimplemented!();
433        }
434
435        fn is_compressed(&self) -> bool {
436            unimplemented!();
437        }
438
439        fn is_encrypted(&self) -> bool {
440            false
441        }
442
443        fn has_crc32(&self) -> bool {
444            unimplemented!();
445        }
446
447        fn crc32(&self) -> u32 {
448            unimplemented!();
449        }
450
451        fn as_any(&self) -> &dyn Any {
452            self
453        }
454    }
455
456    #[test]
457    fn test_chunk_map() {
458        let dir = TempDir::new().unwrap();
459        let blob_path = dir.as_path().join("blob-1");
460        let blob_path = blob_path.as_os_str().to_str().unwrap().to_string();
461        let chunk_count = 1000000;
462        let skip_index = 77;
463
464        let indexed_chunk_map1 = Arc::new(BlobStateMap::from(
465            IndexedChunkMap::new(&blob_path, chunk_count, true).unwrap(),
466        ));
467        let indexed_chunk_map2 = Arc::new(BlobStateMap::from(
468            IndexedChunkMap::new(&blob_path, chunk_count, true).unwrap(),
469        ));
470        let indexed_chunk_map3 = Arc::new(BlobStateMap::from(
471            IndexedChunkMap::new(&blob_path, chunk_count, true).unwrap(),
472        ));
473
474        let now = Instant::now();
475
476        let h1 = thread::spawn(move || {
477            for idx in 0..chunk_count {
478                let chunk = Chunk::new(idx);
479                if idx % skip_index != 0 {
480                    indexed_chunk_map1
481                        .set_ready_and_clear_pending(chunk.as_ref())
482                        .unwrap();
483                }
484            }
485        });
486
487        let h2 = thread::spawn(move || {
488            for idx in 0..chunk_count {
489                let chunk = Chunk::new(idx);
490                if idx % skip_index != 0 {
491                    indexed_chunk_map2
492                        .set_ready_and_clear_pending(chunk.as_ref())
493                        .unwrap();
494                }
495            }
496        });
497
498        h1.join()
499            .map_err(|e| {
500                error!("Join error {:?}", e);
501                e
502            })
503            .unwrap();
504        h2.join()
505            .map_err(|e| {
506                error!("Join error {:?}", e);
507                e
508            })
509            .unwrap();
510
511        println!(
512            "IndexedChunkMap Concurrency: {}ms",
513            now.elapsed().as_millis()
514        );
515
516        for idx in 0..chunk_count {
517            let chunk = Chunk::new(idx);
518
519            let has_ready = indexed_chunk_map3
520                .check_ready_and_mark_pending(chunk.as_ref())
521                .unwrap();
522            if idx % skip_index == 0 {
523                if has_ready {
524                    panic!("indexed chunk map: index {} shouldn't be ready", idx);
525                }
526            } else if !has_ready {
527                panic!("indexed chunk map: index {} should be ready", idx);
528            }
529        }
530    }
531
532    fn iterate(chunks: &[Arc<Chunk>], chunk_map: &dyn ChunkMap, chunk_count: u32) {
533        for idx in 0..chunk_count {
534            chunk_map
535                .set_ready_and_clear_pending(chunks[idx as usize].as_ref())
536                .unwrap();
537        }
538        for idx in 0..chunk_count {
539            assert!(chunk_map
540                .check_ready_and_mark_pending(chunks[idx as usize].as_ref())
541                .unwrap(),);
542        }
543    }
544
545    #[test]
546    fn test_chunk_map_perf() {
547        let dir = TempDir::new().unwrap();
548        let blob_path = dir.as_path().join("blob-1");
549        let blob_path = blob_path.as_os_str().to_str().unwrap().to_string();
550        let chunk_count = 1000000;
551
552        let mut chunks = Vec::new();
553        for idx in 0..chunk_count {
554            chunks.push(Chunk::new(idx))
555        }
556
557        let indexed_chunk_map =
558            BlobStateMap::from(IndexedChunkMap::new(&blob_path, chunk_count, true).unwrap());
559        let now = Instant::now();
560        iterate(&chunks, &indexed_chunk_map as &dyn ChunkMap, chunk_count);
561        let elapsed1 = now.elapsed().as_millis();
562
563        let digested_chunk_map = BlobStateMap::from(DigestedChunkMap::new());
564        let now = Instant::now();
565        iterate(&chunks, &digested_chunk_map as &dyn ChunkMap, chunk_count);
566        let elapsed2 = now.elapsed().as_millis();
567
568        println!(
569            "IndexedChunkMap vs DigestedChunkMap: {}ms vs {}ms",
570            elapsed1, elapsed2
571        );
572    }
573
574    #[test]
575    fn test_inflight_tracer() {
576        let chunk_1: Arc<dyn BlobChunkInfo> = Arc::new({
577            let mut c = MockChunkInfo::new();
578            c.index = 1;
579            c.block_id = RafsDigest::from_buf("hello world".as_bytes(), Blake3);
580            c
581        });
582        let chunk_2: Arc<dyn BlobChunkInfo> = Arc::new({
583            let mut c = MockChunkInfo::new();
584            c.index = 2;
585            c.block_id = RafsDigest::from_buf("hello world 2".as_bytes(), Blake3);
586            c
587        });
588        // indexed ChunkMap
589        let tmp_file = TempFile::new().unwrap();
590        let index_map = Arc::new(BlobStateMap::from(
591            IndexedChunkMap::new(tmp_file.as_path().to_str().unwrap(), 10, true).unwrap(),
592        ));
593        index_map
594            .check_ready_and_mark_pending(chunk_1.as_ref())
595            .unwrap();
596        assert_eq!(index_map.inflight_tracer.lock().unwrap().len(), 1);
597        index_map
598            .check_ready_and_mark_pending(chunk_2.as_ref())
599            .unwrap();
600        assert_eq!(index_map.inflight_tracer.lock().unwrap().len(), 2);
601        index_map
602            .check_ready_and_mark_pending(chunk_1.as_ref())
603            .unwrap_err();
604        index_map
605            .check_ready_and_mark_pending(chunk_2.as_ref())
606            .unwrap_err();
607        assert_eq!(index_map.inflight_tracer.lock().unwrap().len(), 2);
608
609        index_map
610            .set_ready_and_clear_pending(chunk_1.as_ref())
611            .unwrap();
612        assert!(index_map
613            .check_ready_and_mark_pending(chunk_1.as_ref())
614            .unwrap(),);
615        assert_eq!(index_map.inflight_tracer.lock().unwrap().len(), 1);
616
617        index_map.clear_pending(chunk_2.as_ref());
618        assert_eq!(index_map.inflight_tracer.lock().unwrap().len(), 0);
619        assert!(!index_map
620            .check_ready_and_mark_pending(chunk_2.as_ref())
621            .unwrap(),);
622        assert_eq!(index_map.inflight_tracer.lock().unwrap().len(), 1);
623        index_map.clear_pending(chunk_2.as_ref());
624        assert_eq!(index_map.inflight_tracer.lock().unwrap().len(), 0);
625        index_map
626            .set_ready_and_clear_pending(chunk_2.as_ref())
627            .unwrap();
628        assert!(index_map
629            .check_ready_and_mark_pending(chunk_2.as_ref())
630            .unwrap(),);
631        assert_eq!(index_map.inflight_tracer.lock().unwrap().len(), 0);
632
633        // digested ChunkMap
634        let digest_map = Arc::new(BlobStateMap::from(DigestedChunkMap::new()));
635        digest_map
636            .check_ready_and_mark_pending(chunk_1.as_ref())
637            .unwrap();
638        assert_eq!(digest_map.inflight_tracer.lock().unwrap().len(), 1);
639        digest_map
640            .check_ready_and_mark_pending(chunk_2.as_ref())
641            .unwrap();
642        assert_eq!(digest_map.inflight_tracer.lock().unwrap().len(), 2);
643        digest_map
644            .check_ready_and_mark_pending(chunk_1.as_ref())
645            .unwrap_err();
646        digest_map
647            .check_ready_and_mark_pending(chunk_2.as_ref())
648            .unwrap_err();
649        digest_map
650            .set_ready_and_clear_pending(chunk_1.as_ref())
651            .unwrap();
652        assert!(digest_map
653            .check_ready_and_mark_pending(chunk_1.as_ref())
654            .unwrap(),);
655        digest_map.clear_pending(chunk_2.as_ref());
656        assert!(!digest_map
657            .check_ready_and_mark_pending(chunk_2.as_ref())
658            .unwrap(),);
659        digest_map.clear_pending(chunk_2.as_ref());
660        assert_eq!(digest_map.inflight_tracer.lock().unwrap().len(), 0);
661    }
662
663    #[test]
664    fn test_inflight_tracer_race() {
665        let tmp_file = TempFile::new().unwrap();
666        let map = Arc::new(BlobStateMap::from(
667            IndexedChunkMap::new(tmp_file.as_path().to_str().unwrap(), 10, true).unwrap(),
668        ));
669
670        let chunk_4: Arc<dyn BlobChunkInfo> = Arc::new({
671            let mut c = MockChunkInfo::new();
672            c.index = 4;
673            c
674        });
675
676        assert!(!map
677            .as_ref()
678            .check_ready_and_mark_pending(chunk_4.as_ref())
679            .unwrap(),);
680        let map_cloned = map.clone();
681        assert_eq!(map.inflight_tracer.lock().unwrap().len(), 1);
682
683        let chunk_4_cloned = chunk_4.clone();
684        let t1 = thread::Builder::new()
685            .spawn(move || {
686                for _ in 0..4 {
687                    let ready = map_cloned
688                        .check_ready_and_mark_pending(chunk_4_cloned.as_ref())
689                        .unwrap();
690                    assert!(ready);
691                }
692            })
693            .unwrap();
694
695        let map_cloned_2 = map.clone();
696        let chunk_4_cloned_2 = chunk_4.clone();
697        let t2 = thread::Builder::new()
698            .spawn(move || {
699                for _ in 0..2 {
700                    let ready = map_cloned_2
701                        .check_ready_and_mark_pending(chunk_4_cloned_2.as_ref())
702                        .unwrap();
703                    assert!(ready);
704                }
705            })
706            .unwrap();
707
708        thread::sleep(Duration::from_secs(1));
709
710        map.set_ready_and_clear_pending(chunk_4.as_ref()).unwrap();
711
712        // Fuzz
713        map.set_ready_and_clear_pending(chunk_4.as_ref()).unwrap();
714        map.set_ready_and_clear_pending(chunk_4.as_ref()).unwrap();
715
716        assert_eq!(map.inflight_tracer.lock().unwrap().len(), 0);
717
718        t1.join().unwrap();
719        t2.join().unwrap();
720    }
721
722    #[test]
723    /// Case description:
724    ///     Never invoke `set_ready` method, thus to let each caller of `has_ready` reach
725    ///     a point of timeout.
726    /// Expect:
727    ///     The chunk of index 4 is never marked as ready/downloaded.
728    ///     Each caller of `has_ready` can escape from where it is blocked.
729    ///     After timeout, no slot is left in inflight tracer.
730    fn test_inflight_tracer_timeout() {
731        let tmp_file = TempFile::new().unwrap();
732        let map = Arc::new(BlobStateMap::from(
733            IndexedChunkMap::new(tmp_file.as_path().to_str().unwrap(), 10, true).unwrap(),
734        ));
735
736        let chunk_4: Arc<dyn BlobChunkInfo> = Arc::new({
737            let mut c = MockChunkInfo::new();
738            c.index = 4;
739            c
740        });
741
742        map.as_ref()
743            .check_ready_and_mark_pending(chunk_4.as_ref())
744            .unwrap();
745        let map_cloned = map.clone();
746
747        assert_eq!(map.inflight_tracer.lock().unwrap().len(), 1);
748
749        let chunk_4_cloned = chunk_4.clone();
750        let t1 = thread::Builder::new()
751            .spawn(move || {
752                for _ in 0..4 {
753                    map_cloned
754                        .check_ready_and_mark_pending(chunk_4_cloned.as_ref())
755                        .unwrap_err();
756                }
757            })
758            .unwrap();
759
760        t1.join().unwrap();
761
762        assert_eq!(map.inflight_tracer.lock().unwrap().len(), 1);
763
764        map.as_ref()
765            .check_ready_and_mark_pending(chunk_4.as_ref())
766            .unwrap_err();
767        assert_eq!(map.inflight_tracer.lock().unwrap().len(), 1);
768
769        map.clear_pending(chunk_4.as_ref());
770        assert_eq!(map.inflight_tracer.lock().unwrap().len(), 0);
771    }
772
773    #[test]
774    fn test_inflight_tracer_race_range() {
775        let tmp_file = TempFile::new().unwrap();
776        let map = Arc::new(BlobStateMap::from(
777            IndexedChunkMap::new(tmp_file.as_path().to_str().unwrap(), 10, true).unwrap(),
778        ));
779
780        assert!(!map.is_range_all_ready());
781        assert!(!map.is_range_ready(0, 1).unwrap());
782        assert!(!map.is_range_ready(9, 1).unwrap());
783        assert!(map.is_range_ready(10, 1).is_err());
784        assert_eq!(
785            map.check_range_ready_and_mark_pending(0, 2).unwrap(),
786            Some(vec![0, 1])
787        );
788        map.set_range_ready_and_clear_pending(0, 2).unwrap();
789        assert_eq!(map.check_range_ready_and_mark_pending(0, 2).unwrap(), None);
790        map.wait_for_range_ready(0, 2).unwrap();
791        assert_eq!(
792            map.check_range_ready_and_mark_pending(1, 2).unwrap(),
793            Some(vec![2])
794        );
795        map.set_range_ready_and_clear_pending(2, 1).unwrap();
796        map.set_range_ready_and_clear_pending(3, 7).unwrap();
797        assert!(map.is_range_ready(0, 1).unwrap());
798        assert!(map.is_range_ready(9, 1).unwrap());
799        assert!(map.is_range_all_ready());
800    }
801}