qcow2_rs/
dev.rs

1use crate::cache::AsyncLruCache;
2use crate::cache::AsyncLruCacheEntry;
3use crate::error::Qcow2Result;
4use crate::helpers::{qcow2_type_of, Qcow2IoBuf};
5use crate::meta::{
6    L1Entry, L1Table, L2Entry, L2Table, Mapping, MappingSource, Qcow2Header, RefBlock, RefTable,
7    RefTableEntry, SplitGuestOffset, Table, TableEntry,
8};
9use crate::ops::*;
10use crate::zero_buf;
11use async_recursion::async_recursion;
12use futures_locks::{
13    Mutex as AsyncMutex, RwLock as AsyncRwLock, RwLockWriteGuard as LockWriteGuard,
14};
15use miniz_oxide::inflate::core::{decompress as inflate, DecompressorOxide};
16use miniz_oxide::inflate::TINFLStatus;
17use std::cell::RefCell;
18use std::collections::HashMap;
19use std::mem::size_of;
20use std::ops::RangeInclusive;
21use std::path::Path;
22use std::path::PathBuf;
23use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
24
25/// all readable Qcow2 info, make it into single cache line
26#[derive(Debug)]
27pub struct Qcow2Info {
28    // min block size for writing out data to image, it is usually
29    // logical block size of the disk for storing this image, so
30    // all meta data has to be aligned with this block size
31    pub(crate) block_size_shift: u8,
32    pub(crate) cluster_shift: u8,
33    pub(crate) l2_index_shift: u8,
34    pub(crate) l2_slice_index_shift: u8,
35    pub(crate) l2_slice_bits: u8,
36    pub(crate) refcount_order: u8,
37    pub(crate) rb_slice_bits: u8,
38    pub(crate) rb_index_shift: u8,
39    pub(crate) rb_slice_index_shift: u8,
40
41    pub(crate) flags: u16,
42    pub(crate) l2_slice_entries: u32,
43    pub(crate) in_cluster_offset_mask: usize,
44    pub(crate) l2_index_mask: usize,
45    pub(crate) rb_index_mask: usize,
46
47    pub(crate) l2_cache_cnt: u32,
48    pub(crate) rb_cache_cnt: u32,
49    pub(crate) virtual_size: u64,
50}
51
52impl Qcow2Info {
53    const READ_ONLY: u16 = 1 << 0;
54    const HAS_BACK_FILE: u16 = 1 << 1; // we have underlying backing image
55    const BACK_FILE: u16 = 1 << 2; // this is one backing image, read only
56    pub fn new(h: &Qcow2Header, p: &Qcow2DevParams) -> Qcow2Result<Qcow2Info> {
57        let block_size_shift = p.get_bs_bits();
58
59        let ro = p.is_read_only();
60
61        if p.is_backing_dev() {
62            assert!(ro);
63        }
64
65        let cluster_shift: u8 = h.cluster_bits().try_into().unwrap();
66        let cluster_size: usize = 1usize
67            .checked_shl(cluster_shift.into())
68            .ok_or_else(|| format!("cluster_bits={} is too large", cluster_shift))?;
69        let refcount_order: u8 = h.refcount_order().try_into().unwrap();
70
71        //at least two l2 caches
72        let (l2_slice_bits, l2_cache_cnt) = match p.l2_cache {
73            Some((b, s)) => {
74                debug_assert!(b >= block_size_shift && u32::from(b) <= cluster_shift as u32);
75                assert!((s >> b) >= 2);
76                (b, s >> b)
77            }
78            None => {
79                let mapping_bytes = std::cmp::min(h.size() >> (cluster_shift - 3), 32 << 20);
80                let bits = 12_u8;
81                let cnt = std::cmp::max((mapping_bytes as usize) >> bits, 2);
82
83                (bits, cnt)
84            }
85        };
86
87        //at least two rb caches
88        let (rb_slice_bits, rb_cache_cnt) = match p.rb_cache {
89            Some((b, s)) => {
90                debug_assert!(b >= block_size_shift && u32::from(b) <= cluster_shift as u32);
91                assert!((s >> b) >= 2);
92                (b, s >> b)
93            }
94            None => {
95                let mapping_bytes = 256 << 10;
96                let bits = 12_u8;
97                let cnt = std::cmp::max((mapping_bytes as usize) >> bits, 2);
98
99                (bits, cnt)
100            }
101        };
102
103        //todo: support extended l2
104        let l2_entries = cluster_size / std::mem::size_of::<u64>();
105        let l2_slice_entries: u32 = (l2_entries as u32) >> (cluster_shift - l2_slice_bits);
106        let rb_entries = cluster_size * 8 / (1 << refcount_order);
107        let rb_slice_entries: u32 = (1 << (rb_slice_bits + 3)) >> refcount_order;
108
109        Ok(Qcow2Info {
110            virtual_size: h.size(),
111            refcount_order,
112            cluster_shift,
113            l2_slice_entries,
114            in_cluster_offset_mask: cluster_size - 1,
115            l2_index_mask: l2_entries - 1,
116            l2_index_shift: l2_entries.trailing_zeros().try_into().unwrap(),
117            l2_slice_index_shift: l2_slice_entries.trailing_zeros().try_into().unwrap(),
118            rb_index_mask: rb_entries - 1,
119            rb_index_shift: rb_entries.trailing_zeros().try_into().unwrap(),
120            block_size_shift,
121            l2_slice_bits,
122            l2_cache_cnt: l2_cache_cnt.try_into().unwrap(),
123            rb_cache_cnt: rb_cache_cnt.try_into().unwrap(),
124            flags: if ro { Qcow2Info::READ_ONLY } else { 0 }
125                | if h.backing_filename().is_some() {
126                    Qcow2Info::HAS_BACK_FILE
127                } else {
128                    0
129                }
130                | if p.is_backing_dev() {
131                    Qcow2Info::BACK_FILE
132                } else {
133                    0
134                },
135            rb_slice_bits,
136            rb_slice_index_shift: rb_slice_entries.trailing_zeros().try_into().unwrap(),
137        })
138    }
139
140    #[inline(always)]
141    pub fn rb_entries(&self) -> usize {
142        (self.cluster_size() << 3) >> self.refcount_order
143    }
144
145    #[inline(always)]
146    pub fn l2_entries(&self) -> usize {
147        self.cluster_size() / std::mem::size_of::<u64>()
148    }
149
150    #[inline(always)]
151    pub fn virtual_size(&self) -> u64 {
152        self.virtual_size
153    }
154
155    #[inline(always)]
156    pub fn in_cluster_offset(&self, offset: u64) -> usize {
157        offset as usize & self.in_cluster_offset_mask
158    }
159
160    #[inline(always)]
161    pub fn cluster_size(&self) -> usize {
162        1 << self.cluster_shift
163    }
164
165    #[inline(always)]
166    pub fn cluster_bits(&self) -> usize {
167        self.cluster_shift as usize
168    }
169
170    #[inline(always)]
171    pub fn refcount_order(&self) -> u8 {
172        self.refcount_order
173    }
174
175    #[inline]
176    pub(crate) fn __max_l1_entries(size: u64, cluster_bits: usize, l2_entries: usize) -> usize {
177        let size_per_entry = (l2_entries as u64) << cluster_bits;
178        let max_entries = Qcow2Header::MAX_L1_SIZE as usize / size_of::<u64>();
179        let entries = ((size + size_per_entry - 1) / size_per_entry) as usize;
180
181        std::cmp::min(entries, max_entries)
182    }
183
184    #[inline]
185    pub(crate) fn get_max_l1_entries(size: u64, cluster_bits: usize) -> usize {
186        let l2_entries = (1usize << cluster_bits) / size_of::<u64>();
187
188        Self::__max_l1_entries(size, cluster_bits, l2_entries)
189    }
190
191    #[inline(always)]
192    pub(crate) fn max_l1_entries(&self) -> usize {
193        Self::__max_l1_entries(
194            self.virtual_size,
195            self.cluster_shift as usize,
196            self.l2_entries(),
197        )
198    }
199
200    #[inline]
201    pub(crate) fn __max_l1_size(max_l1_entries: usize, bs: usize) -> usize {
202        let entries = max_l1_entries;
203
204        (entries * size_of::<u64>() + bs - 1) & !(bs - 1)
205    }
206
207    #[allow(dead_code)]
208    #[inline(always)]
209    pub(crate) fn max_l1_size(&self) -> usize {
210        let entries = self.max_l1_entries();
211
212        Self::__max_l1_size(entries, 1 << self.block_size_shift)
213    }
214
215    pub(crate) fn __max_refcount_table_size(
216        size: u64,
217        cluster_size: usize,
218        refcount_order: u8,
219        bs: usize,
220    ) -> usize {
221        let rb_entries = (cluster_size as u64) * 8 / (1 << refcount_order);
222        let rt_entry_size = rb_entries * (cluster_size as u64);
223
224        let rc_table_entries = (size + rt_entry_size - 1) / rt_entry_size;
225        let rc_table_size =
226            ((rc_table_entries as usize * std::mem::size_of::<u64>()) + bs - 1) & !(bs - 1);
227
228        std::cmp::min(rc_table_size, 8usize << 20)
229    }
230
231    #[inline(always)]
232    fn rb_slice_entries(&self) -> u32 {
233        (1 << (self.rb_slice_bits + 3)) >> self.refcount_order
234    }
235
236    #[inline(always)]
237    fn is_read_only(&self) -> bool {
238        self.flags & Qcow2Info::READ_ONLY != 0
239    }
240
241    #[inline(always)]
242    pub(crate) fn has_back_file(&self) -> bool {
243        self.flags & Qcow2Info::HAS_BACK_FILE != 0
244    }
245
246    #[inline(always)]
247    pub fn is_back_file(&self) -> bool {
248        self.flags & Qcow2Info::BACK_FILE != 0
249    }
250}
251
252/// for cluster allocator
253#[derive(Debug, Clone)]
254struct HostCluster(u64);
255
256impl HostCluster {
257    #[inline(always)]
258    fn cluster_off_from_slice(&self, info: &Qcow2Info, idx: usize) -> u64 {
259        self.rb_slice_host_start(info) + ((idx as u64) << info.cluster_shift)
260    }
261
262    #[inline(always)]
263    fn rt_index(&self, info: &Qcow2Info) -> usize {
264        let bits = info.rb_index_shift + info.cluster_bits() as u8;
265
266        (self.0 >> bits).try_into().unwrap()
267    }
268
269    #[inline(always)]
270    fn rb_index(&self, info: &Qcow2Info) -> usize {
271        let cluster_idx = self.0 >> info.cluster_shift;
272
273        cluster_idx as usize & info.rb_index_mask
274    }
275
276    #[inline(always)]
277    fn rb_slice_index(&self, info: &Qcow2Info) -> usize {
278        let off = self.0 >> info.cluster_shift;
279        off as usize & ((info.rb_slice_entries() - 1) as usize)
280    }
281
282    #[inline(always)]
283    fn rb_slice_key(&self, info: &Qcow2Info) -> usize {
284        (self.0 >> (info.cluster_shift + info.rb_slice_index_shift)) as usize
285    }
286
287    #[inline(always)]
288    fn rb_slice_host_start(&self, info: &Qcow2Info) -> u64 {
289        self.0 & !((1 << (info.cluster_shift + info.rb_slice_index_shift)) - 1)
290    }
291
292    #[inline(always)]
293    fn rb_slice_host_end(&self, info: &Qcow2Info) -> u64 {
294        self.rb_slice_host_start(info) + (info.rb_slice_entries() << info.cluster_bits()) as u64
295    }
296
297    #[inline(always)]
298    fn rb_host_start(&self, info: &Qcow2Info) -> u64 {
299        self.0 & !((1 << (info.cluster_shift + info.rb_index_shift)) - 1)
300    }
301
302    #[inline(always)]
303    fn rb_host_end(&self, info: &Qcow2Info) -> u64 {
304        self.rb_host_start(info) + (info.rb_entries() << info.cluster_bits()) as u64
305    }
306
307    #[inline(always)]
308    fn rb_slice_off_in_table(&self, info: &Qcow2Info) -> usize {
309        let rb_idx = self.rb_index(info);
310
311        (rb_idx >> info.rb_slice_index_shift) << info.rb_slice_bits
312    }
313}
314
315#[derive(Debug, Clone, Default)]
316pub struct Qcow2DevParams {
317    pub(crate) rb_cache: Option<(u8, usize)>,
318    pub(crate) l2_cache: Option<(u8, usize)>,
319    bs_shift: u8,
320    direct_io: bool,
321    read_only: RefCell<bool>,
322    backing: RefCell<Option<bool>>,
323}
324
325impl Qcow2DevParams {
326    pub fn new(
327        bs_bits: u8,
328        rb_cache: Option<(u8, usize)>,
329        l2_cache: Option<(u8, usize)>,
330        ro: bool,
331        dio: bool,
332    ) -> Self {
333        Qcow2DevParams {
334            bs_shift: bs_bits,
335            rb_cache,
336            l2_cache,
337            read_only: std::cell::RefCell::new(ro),
338            direct_io: dio,
339            backing: std::cell::RefCell::new(None),
340        }
341    }
342
343    pub fn get_bs_bits(&self) -> u8 {
344        self.bs_shift
345    }
346
347    pub fn set_read_only(&self, ro: bool) {
348        *self.read_only.borrow_mut() = ro;
349    }
350
351    pub fn is_direct_io(&self) -> bool {
352        self.direct_io
353    }
354
355    pub fn is_read_only(&self) -> bool {
356        *self.read_only.borrow()
357    }
358
359    pub fn mark_backing_dev(&self, backing: Option<bool>) {
360        *self.backing.borrow_mut() = backing;
361
362        self.set_read_only(true);
363    }
364
365    pub fn is_backing_dev(&self) -> bool {
366        (*self.backing.borrow()).is_some()
367    }
368}
369
370type L2TableHandle = AsyncRwLock<L2Table>;
371
372pub struct Qcow2Dev<T> {
373    path: PathBuf,
374    header: AsyncRwLock<Qcow2Header>,
375
376    // mapping table
377    l1table: AsyncRwLock<L1Table>,
378    l2cache: AsyncLruCache<usize, L2TableHandle>,
379
380    // splices share single cluster, so before flushing any cluster
381    // which is used for slices, discard this cluster first, then any
382    // new slice stored in this cluster can loaded with correct data
383    //
384    // true value means this cluster isn't discarded yet, and false
385    // means the cluster has been discarded
386    //
387    // used by both mapping table and allocator
388    // false: not discarded, true: being discarded
389    new_cluster: AsyncRwLock<HashMap<u64, AsyncRwLock<bool>>>,
390
391    // allocator
392    free_cluster_offset: AtomicU64,
393    reftable: AsyncRwLock<RefTable>,
394    refblock_cache: AsyncLruCache<usize, AsyncRwLock<RefBlock>>,
395
396    // set in case that any dirty meta is made
397    need_flush: AtomicBool,
398    flush_lock: AsyncMutex<()>,
399
400    file: T,
401    backing_file: Option<Box<Qcow2Dev<T>>>,
402    pub info: Qcow2Info,
403}
404
405impl<T> std::fmt::Debug for Qcow2Dev<T> {
406    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
407        let _ = write!(f, "Image path {:?}\ninfo {:?}\n", &self.path, &self.info);
408        let _ = match &self.backing_file {
409            Some(b) => write!(f, "backing {:?}", b),
410            _ => write!(f, "backing None"),
411        };
412        writeln!(f)
413    }
414}
415
416impl<T: Qcow2IoOps> Qcow2Dev<T> {
417    pub fn new(
418        path: &Path,
419        header: Qcow2Header,
420        params: &Qcow2DevParams,
421        file: T,
422    ) -> Qcow2Result<Self> {
423        let h = &header;
424        let bs_shift = params.get_bs_bits();
425
426        debug_assert!((9..=12).contains(&bs_shift));
427        let info = Qcow2Info::new(h, params)?;
428
429        let l2_cache_cnt = info.l2_cache_cnt as usize;
430        let rb_cache_cnt = info.rb_cache_cnt as usize;
431        let l1_size = Qcow2Info::__max_l1_size(
432            Qcow2Info::get_max_l1_entries(h.size(), h.cluster_bits().try_into().unwrap()),
433            1 << bs_shift,
434        );
435        let rt_size = h.reftable_clusters() << h.cluster_bits();
436        let l1_entries = h.l1_table_entries() as u32;
437
438        log::info!(
439            "l2 slice cache(bits: {} count {}), rb cache(bits: {} count {})",
440            info.l2_slice_bits,
441            l2_cache_cnt,
442            info.rb_slice_bits,
443            rb_cache_cnt,
444        );
445
446        let dev = Qcow2Dev {
447            path: path.to_path_buf(),
448            header: AsyncRwLock::new(header),
449            file,
450            backing_file: None,
451            info,
452            l1table: AsyncRwLock::new(L1Table::new(None, l1_size, l1_entries, bs_shift)),
453            l2cache: AsyncLruCache::new(l2_cache_cnt),
454            free_cluster_offset: AtomicU64::new(0),
455            reftable: AsyncRwLock::new(RefTable::new(None, rt_size, bs_shift)),
456            refblock_cache: AsyncLruCache::new(rb_cache_cnt),
457            new_cluster: AsyncRwLock::new(Default::default()),
458            need_flush: AtomicBool::new(false),
459            flush_lock: AsyncMutex::new(()),
460        };
461
462        Ok(dev)
463    }
464
465    async fn cluster_is_new(&self, cluster: u64) -> bool {
466        let map = self.new_cluster.read().await;
467
468        map.contains_key(&cluster)
469    }
470
471    async fn mark_new_cluster(&self, cluster: u64) {
472        let mut map = self.new_cluster.write().await;
473
474        map.insert(cluster, AsyncRwLock::new(false));
475    }
476
477    async fn clear_new_cluster(&self, cluster: u64) {
478        let mut map = self.new_cluster.write().await;
479
480        map.remove(&cluster);
481    }
482
483    /// Setup the backing Qcow2 device
484    pub fn set_backing_dev(&mut self, back: Box<Qcow2Dev<T>>) {
485        self.backing_file = Some(back);
486    }
487
488    #[inline]
489    async fn call_read(&self, offset: u64, buf: &mut [u8]) -> Qcow2Result<usize> {
490        log::trace!("read_to off {:x} len {}", offset, buf.len());
491        self.file.read_to(offset, buf).await
492    }
493
494    #[inline]
495    async fn call_write(&self, offset: u64, buf: &[u8]) -> Qcow2Result<()> {
496        log::trace!("write_from off {:x} len {}", offset, buf.len());
497        self.file.write_from(offset, buf).await
498    }
499
500    #[inline]
501    async fn call_fallocate(&self, offset: u64, len: usize, flags: u32) -> Qcow2Result<()> {
502        log::trace!("fallocate off {:x} len {}", offset, len);
503        let res = self.file.fallocate(offset, len, flags).await;
504        match res {
505            Err(_) => {
506                let mut zero_data = Qcow2IoBuf::<u8>::new(len);
507
508                log::trace!("discard fallback off {:x} len {}", offset, len);
509                zero_data.zero_buf();
510                self.call_write(offset, &zero_data).await
511            }
512            Ok(_) => Ok(()),
513        }
514    }
515
516    /// flush data range in (offset, len) to disk
517    #[inline]
518    async fn call_fsync(&self, offset: u64, len: usize, flags: u32) -> Qcow2Result<()> {
519        log::trace!("fsync off {:x} len {} flags {}", offset, len, flags);
520        self.file.fsync(offset, len, flags).await
521    }
522
523    async fn load_top_table<B: Table>(&self, top: &AsyncRwLock<B>, off: u64) -> Qcow2Result<usize> {
524        let mut t = top.write().await;
525
526        if t.is_update() {
527            return Ok(0);
528        }
529
530        t.set_offset(Some(off));
531        let buf = unsafe { std::slice::from_raw_parts_mut(t.as_mut_ptr(), t.byte_size()) };
532        self.call_read(off, buf).await
533    }
534
535    async fn load_refcount_table(&self) -> Qcow2Result<usize> {
536        let h = self.header.read().await;
537        self.load_top_table(&self.reftable, h.reftable_offset())
538            .await
539    }
540
541    async fn load_l1_table(&self) -> Qcow2Result<usize> {
542        let h = self.header.read().await;
543        self.load_top_table(&self.l1table, h.l1_table_offset())
544            .await
545    }
546
547    async fn get_l1_entry(&self, split: &SplitGuestOffset) -> Qcow2Result<L1Entry> {
548        let l1_index = split.l1_index(&self.info);
549        let res = {
550            let handle = self.l1table.read().await;
551            if handle.is_update() {
552                Some(handle.get(l1_index))
553            } else {
554                None
555            }
556        };
557
558        let l1_entry = match res {
559            None => self.l1table.read().await.get(l1_index),
560            Some(entry) => entry,
561        };
562
563        Ok(l1_entry)
564    }
565
566    #[inline]
567    async fn add_l2_slice(
568        &self,
569        l1_e: &L1Entry,
570        key: usize,
571        slice_off: usize,
572        slice: L2Table,
573    ) -> Qcow2Result<()> {
574        match self
575            .add_cache_slice(&self.l2cache, l1_e, key, slice_off, slice)
576            .await?
577        {
578            Some(to_kill) => {
579                log::warn!("add_l2_slice: cache eviction, slices {}", to_kill.len());
580                // figure exact dependency on refcount cache & reftable entries
581                self.flush_refcount().await?;
582                self.flush_cache_entries(to_kill).await
583            }
584            _ => Ok(()),
585        }
586    }
587
588    #[inline]
589    async fn get_l2_slice_slow(
590        &self,
591        l1_e: &L1Entry,
592        split: &SplitGuestOffset,
593    ) -> Qcow2Result<AsyncLruCacheEntry<AsyncRwLock<L2Table>>> {
594        let info = &self.info;
595        let key = split.l2_slice_key(info);
596        let l2_cache = &self.l2cache;
597
598        log::debug!(
599            "get_l2_slice_slow: l1_e {:x} virt_addr {:x}",
600            l1_e.get_value(),
601            split.guest_addr(),
602        );
603
604        self.add_l2_slice(
605            l1_e,
606            key,
607            split.l2_slice_off_in_table(info),
608            L2Table::new(None, 1 << info.l2_slice_bits, info.cluster_bits()),
609        )
610        .await?;
611
612        if let Some(entry) = l2_cache.get(key) {
613            Ok(entry)
614        } else {
615            Err("Fail to load l2 table".into())
616        }
617    }
618
619    #[inline]
620    async fn get_l2_slice(
621        &self,
622        split: &SplitGuestOffset,
623    ) -> Qcow2Result<AsyncLruCacheEntry<AsyncRwLock<L2Table>>> {
624        let key = split.l2_slice_key(&self.info);
625
626        match self.l2cache.get(key) {
627            Some(entry) => Ok(entry),
628            None => {
629                let l1_e = self.get_l1_entry(split).await?;
630                self.get_l2_slice_slow(&l1_e, split).await
631            }
632        }
633    }
634
635    async fn flush_cache_entries<B: Table>(
636        &self,
637        v: Vec<(usize, AsyncLruCacheEntry<AsyncRwLock<B>>)>,
638    ) -> Qcow2Result<()> {
639        let info = &self.info;
640        let tv = &v;
641        let mut f_vec = Vec::new();
642
643        // Track 'new' clusters used in refcount/map table slice, any one
644        // inserted to this set, it is being discarded. Once the discard is
645        // completed, flush related slices in this cluster.
646        //
647        // The order does matter.
648        //
649        // We have to guarantee that new cluster has to be discarded once, and
650        // exactly once before flushing any dirty slice.
651        let mut cluster_map = HashMap::new();
652
653        // For holding each rb/l2 read lock during write, so cache update
654        // can't be prevented when flushing this dirty cache.
655        //
656        // But new cluster can't be removed from self.new_cluster() set
657        // until the discard is done, so any new cache loading can be done
658        // from in-flight during discarding new cluster.
659        //
660        let mut cache_vec = Vec::new();
661
662        log::info!("flush caches: count {}", v.len());
663
664        //discard first
665        {
666            for (_, e) in tv {
667                if e.is_dirty() {
668                    // For any cache update and set_dirty(true), write lock
669                    // has to be obtained
670                    let cache = e.value().read().await;
671
672                    // clearing dirty now since cache update won't happen now,
673                    // and dirty is only used for flushing cache.
674                    e.set_dirty(false);
675
676                    match cache.get_offset() {
677                        Some(cache_off) => {
678                            let key = cache_off >> info.cluster_bits();
679
680                            if !cluster_map.contains_key(&key) && self.cluster_is_new(key).await {
681                                let cls_map = self.new_cluster.read().await;
682                                // keep this cluster locked, so that concurrent discard can
683                                // be avoided
684                                let mut locked_cls = cls_map.get(&key).unwrap().write().await;
685
686                                log::debug!(
687                                    "flush_cache_entries: discard cluster {:x} done {}",
688                                    cache_off & !((1 << info.cluster_bits()) - 1),
689                                    *locked_cls
690                                );
691                                if !(*locked_cls) {
692                                    // mark it as discarded, so others can observe it after
693                                    // grabbing write lock
694                                    *locked_cls = true;
695                                    if cls_map.contains_key(&key) {
696                                        f_vec.push(self.call_fallocate(
697                                            cache_off & !((1 << info.cluster_bits()) - 1),
698                                            1 << info.cluster_bits(),
699                                            Qcow2OpsFlags::FALLOCATE_ZERO_RAGE,
700                                        ));
701                                        cluster_map.insert(key, locked_cls);
702                                    }
703                                }
704                            }
705                        }
706                        _ => {
707                            eprintln!("flush cache: dirty cache without offset");
708                        }
709                    }
710                    // holding this cache's read block until this flush is done
711                    cache_vec.push(cache);
712                }
713            }
714        }
715
716        futures::future::join_all(f_vec).await;
717
718        {
719            let mut cls_map = self.new_cluster.write().await;
720
721            for (cls_key, _locked_cls) in cluster_map {
722                cls_map.remove(&cls_key);
723
724                // _locked_cls drops after this entry is removed from
725                // new cluster map
726            }
727        }
728
729        let mut f_vec = Vec::new();
730        for cache in cache_vec.iter() {
731            let off = cache.get_offset().unwrap();
732            let buf = unsafe { std::slice::from_raw_parts(cache.as_ptr(), cache.byte_size()) };
733            log::trace!(
734                "flush_cache_entries: cache {} offset {:x}",
735                qcow2_type_of(cache),
736                off
737            );
738            f_vec.push(self.call_write(off, buf));
739        }
740
741        let res = futures::future::join_all(f_vec).await;
742        for r in res {
743            if r.is_err() {
744                eprintln!("cache slice write failed {:?}\n", r);
745                return r;
746            }
747        }
748
749        //each cache's read lock drops here
750
751        Ok(())
752    }
753
754    /// if the refblock cache for holding refcount block slice is empty
755    pub fn refblock_cache_is_empty(&self) -> bool {
756        self.refblock_cache.is_empty()
757    }
758
759    /// if the l2 cache for holding l2 slice is empty
760    pub fn l2_cache_is_empty(&self) -> bool {
761        self.l2cache.is_empty()
762    }
763
764    /// shrink both refblock and l2 caches, so the memory can be
765    /// released, often called when qcow2 device is idle
766    pub async fn shrink_caches(&self) -> Qcow2Result<()> {
767        self.flush_meta().await?;
768        self.refblock_cache.shrink();
769        self.l2cache.shrink();
770
771        Ok(())
772    }
773
774    async fn flush_cache<C: Table>(
775        &self,
776        cache: &AsyncLruCache<usize, AsyncRwLock<C>>,
777        start: usize,
778        end: usize,
779    ) -> Qcow2Result<bool> {
780        let entries = cache.get_dirty_entries(start, end);
781
782        if !entries.is_empty() {
783            log::debug!(
784                "flush_cache: type {} {:x} - {:x}",
785                qcow2_type_of(cache),
786                start,
787                end,
788            );
789
790            self.flush_cache_entries(entries).await?;
791            Ok(true)
792        } else {
793            Ok(false)
794        }
795    }
796
797    async fn flush_table<B: Table>(&self, t: &B, start: u32, size: usize) -> Qcow2Result<()> {
798        let off = t.get_offset().unwrap() + start as u64;
799        let buf = unsafe {
800            std::slice::from_raw_parts(((t.as_ptr() as u64) + start as u64) as *const u8, size)
801        };
802        self.call_write(off, buf).await
803    }
804
805    async fn flush_top_table<B: Table>(&self, rt: &B) -> Qcow2Result<()> {
806        while let Some(idx) = rt.pop_dirty_blk_idx(None) {
807            let start = idx << self.info.block_size_shift;
808            let size = 1 << self.info.block_size_shift;
809            self.flush_table(rt, start, size).await?
810        }
811
812        Ok(())
813    }
814
815    // Flush one kind of meta data, l2 tables & l1, or refcount blocks with
816    // refcount table
817    //
818    // It is one generic helper, so named as flush_meta_generic()
819    async fn flush_meta_generic<A: Table + std::fmt::Debug, B: Table, F>(
820        &self,
821        rt: &A,
822        cache: &AsyncLruCache<usize, AsyncRwLock<B>>,
823        key_fn: F,
824    ) -> Qcow2Result<bool>
825    where
826        F: Fn(u64) -> usize,
827    {
828        let bs_bits = self.info.block_size_shift;
829
830        if let Some(idx) = rt.pop_dirty_blk_idx(None) {
831            let start = key_fn((idx as u64) << bs_bits);
832            let end = key_fn(((idx + 1) as u64) << bs_bits);
833
834            if self.flush_cache(cache, start, end).await? {
835                // order cache flush and the upper layer table
836                self.call_fsync(0, usize::MAX, 0).await?;
837            }
838            self.flush_table(rt, idx << bs_bits, 1 << bs_bits).await?;
839            Ok(false)
840        } else {
841            // flush cache without holding top table read lock
842            if self.flush_cache(cache, 0, usize::MAX).await? {
843                self.call_fsync(0, usize::MAX, 0).await?;
844            }
845            Ok(true)
846        }
847    }
848
849    //// flush refcount table and block dirty data to disk
850    async fn flush_refcount(&self) -> Qcow2Result<()> {
851        let info = &self.info;
852
853        loop {
854            let rt = &*self.reftable.read().await;
855            let done = self
856                .flush_meta_generic(rt, &self.refblock_cache, |off| {
857                    let rt_idx: u64 = off >> 3;
858                    let host_cls = (rt_idx << info.rb_index_shift) << info.cluster_bits();
859                    let k = HostCluster(host_cls);
860                    k.rb_slice_key(info)
861                })
862                .await?;
863            if done {
864                break;
865            }
866        }
867        Ok(())
868    }
869
870    //// flush mapping cache
871    async fn flush_mapping(&self, l1: &L1Table) -> Qcow2Result<()> {
872        let info = &self.info;
873
874        loop {
875            let done = self
876                .flush_meta_generic(l1, &self.l2cache, |off| {
877                    let l1_idx: u64 = off >> 3;
878                    let virt_addr = (l1_idx << info.l2_index_shift) << info.cluster_bits();
879                    let k = SplitGuestOffset(virt_addr);
880                    k.l2_slice_key(info)
881                })
882                .await?;
883            if done {
884                break;
885            }
886        }
887        Ok(())
888    }
889
890    /// for flushing data in the qcow2 virtual range to disk
891    pub async fn fsync_range(&self, off: u64, len: usize) -> Qcow2Result<()> {
892        self.call_fsync(off, len, 0).await
893    }
894
895    /// flush meta data in ram to disk
896    pub async fn flush_meta(&self) -> Qcow2Result<()> {
897        let info = &self.info;
898        let _flush_lock = self.flush_lock.lock().await;
899
900        log::debug!("flush_meta: entry");
901        loop {
902            // refcount is usually small size & continuous, so simply
903            // flush all
904            self.flush_refcount().await?;
905
906            // read lock prevents update on l1 table, meantime
907            // normal read and cache-hit write can go without any
908            // problem
909            let l1 = &*self.l1table.read().await;
910
911            let done = self
912                .flush_meta_generic(l1, &self.l2cache, |off| {
913                    let l1_idx: u64 = off >> 3;
914                    let virt_addr = (l1_idx << info.l2_index_shift) << info.cluster_bits();
915                    let k = SplitGuestOffset(virt_addr);
916                    k.l2_slice_key(info)
917                })
918                .await?;
919            if done {
920                self.mark_need_flush(false);
921                break;
922            }
923        }
924        log::debug!("flush_meta: exit");
925        Ok(())
926    }
927
928    // if we are running out of reftable, allocate more clusters and replace
929    // current refcount table with new one
930    //
931    // All dirty refblock tables need to be flushed before flushing out the new
932    // reftable.
933    //
934    // Very slow code path.
935    async fn grow_reftable(
936        &self,
937        reftable: &LockWriteGuard<RefTable>,
938        grown_rt: &mut RefTable,
939    ) -> Qcow2Result<()> {
940        let info = &self.info;
941        let new_rt_clusters = grown_rt.cluster_count(info);
942        if new_rt_clusters >= info.rb_entries() - 1 {
943            // 1 entry stays free so we can allocate this refblock by putting its refcount into
944            // itself
945            // TODO: Implement larger allocations
946            return Err(format!(
947                "The reftable needs to grow to {} bytes, but we can allocate only {} -- try \
948                     increasing the cluster size",
949                new_rt_clusters * info.cluster_size(),
950                (info.rb_entries() - 1) * info.cluster_size(),
951            )
952            .into());
953        }
954
955        // Allocate new reftable, put its refcounts in a completely new refblock
956        let old_rt_offset = reftable.get_offset().unwrap();
957        let old_rt_clusters = reftable.cluster_count(info);
958
959        let rb_size = 1 << info.rb_slice_bits;
960        let mut new_refblock = RefBlock::new(info.refcount_order(), rb_size, None);
961
962        let refblock_offset =
963            (reftable.entries() as u64) << (info.rb_index_shift + info.cluster_shift);
964        new_refblock.set_offset(Some(refblock_offset));
965        let rt_offset = refblock_offset + info.cluster_size() as u64;
966        grown_rt.set_offset(Some(rt_offset));
967
968        // Reference for the refblock
969        new_refblock.increment(0).unwrap();
970        // References for the reftable
971        for i in 1..(new_rt_clusters + 1) {
972            new_refblock.increment(i).unwrap();
973        }
974
975        let cls = HostCluster(refblock_offset);
976
977        let rb_before = self.call_fallocate(
978            cls.rb_slice_host_start(info),
979            (refblock_offset - cls.rb_slice_host_start(info))
980                .try_into()
981                .unwrap(),
982            Qcow2OpsFlags::FALLOCATE_ZERO_RAGE,
983        );
984        let rb = self.flush_table(&new_refblock, 0, new_refblock.byte_size());
985        let rb_after = self.call_fallocate(
986            refblock_offset + rb_size as u64,
987            cls.rb_slice_host_end(info) as usize - refblock_offset as usize - rb_size,
988            Qcow2OpsFlags::FALLOCATE_ZERO_RAGE,
989        );
990        let (res0, res1, res2) = futures::join!(rb_before, rb, rb_after);
991        if res0.is_err() || res1.is_err() || res2.is_err() {
992            return Err("Failed to flush refcount block or discard other parts".into());
993        }
994
995        //todo: write all dirty refcount_block
996
997        grown_rt.set_refblock_offset(reftable.entries(), refblock_offset);
998        self.flush_top_table(grown_rt).await?;
999
1000        // write header
1001        {
1002            let mut h = self.header.write().await;
1003
1004            h.set_reftable(
1005                grown_rt.get_offset().unwrap(),
1006                grown_rt.cluster_count(&self.info),
1007            )?;
1008
1009            let buf = h.serialize_to_buf()?;
1010            if let Err(err) = self.call_write(0, &buf).await {
1011                h.set_reftable(old_rt_offset, old_rt_clusters).unwrap();
1012                return Err(err);
1013            }
1014        }
1015
1016        self.free_clusters(old_rt_offset, old_rt_clusters).await?;
1017
1018        Ok(())
1019    }
1020
1021    async fn get_reftable_entry(&self, rt_idx: usize) -> RefTableEntry {
1022        let reftable = self.reftable.read().await;
1023        reftable.get(rt_idx)
1024    }
1025
1026    /// free allocated clusters
1027    async fn free_clusters(&self, mut host_cluster: u64, mut count: usize) -> Qcow2Result<()> {
1028        let info = &self.info;
1029        let mut first_zero = true;
1030
1031        log::info!("free_clusters start {:x} num {}", host_cluster, count);
1032        while count > 0 {
1033            let cls = HostCluster(host_cluster);
1034            let mut rt_e = self.get_reftable_entry(cls.rt_index(info)).await;
1035
1036            if rt_e.is_zero() {
1037                rt_e = self.get_reftable_entry(cls.rt_index(info)).await;
1038            }
1039
1040            let rb_handle = match self.get_refblock(&cls, &rt_e).await {
1041                Ok(handle) => handle,
1042                Err(_) => {
1043                    let next_cls = cls.rb_slice_host_end(info);
1044                    if next_cls - host_cluster >= count as u64 {
1045                        return Err("Fail to load refblock in freeing cluster".into());
1046                    }
1047                    let skip = next_cls - host_cluster;
1048                    count -= skip as usize;
1049                    host_cluster = next_cls;
1050                    continue;
1051                }
1052            };
1053
1054            let mut refblock = rb_handle.value().write().await;
1055            let end = cls.rb_slice_host_end(info);
1056
1057            log::debug!(
1058                "free host_cls {:x} start {:x} end {:x} count {}",
1059                cls.0,
1060                cls.rb_slice_host_start(info),
1061                end,
1062                count
1063            );
1064
1065            while count > 0 && host_cluster < end {
1066                let cls = HostCluster(host_cluster);
1067                let slice_idx = cls.rb_slice_index(info);
1068
1069                refblock.decrement(slice_idx).unwrap();
1070                if refblock.get(slice_idx).is_zero() && first_zero {
1071                    self.free_cluster_offset
1072                        .fetch_min(host_cluster, Ordering::Relaxed);
1073                    first_zero = false;
1074                }
1075                count -= 1;
1076                host_cluster += 1 << info.cluster_bits();
1077            }
1078            rb_handle.set_dirty(true);
1079            self.mark_need_flush(true);
1080        }
1081
1082        Ok(())
1083    }
1084
1085    async fn add_cache_slice<B: Table + std::fmt::Debug, E: TableEntry>(
1086        &self,
1087        cache: &AsyncLruCache<usize, AsyncRwLock<B>>,
1088        top_e: &E,
1089        key: usize,
1090        slice_off: usize,
1091        slice: B,
1092    ) -> Qcow2Result<Option<Vec<(usize, AsyncLruCacheEntry<AsyncRwLock<B>>)>>> {
1093        let info = &self.info;
1094
1095        log::trace!(
1096            "add slice: entry {:x} slice key 0x{:<x} update {} type {} off {:x}",
1097            top_e.get_value(),
1098            key,
1099            slice.is_update(),
1100            crate::helpers::qcow2_type_of(&slice),
1101            slice_off
1102        );
1103
1104        // may return entry added in other code paths, but it is guaranteed that
1105        // we can get one entry here
1106        let entry = cache.put_into_wmap_with(key, || AsyncRwLock::new(slice));
1107
1108        // hold write lock, so anyone can't get this entry
1109        // and the whole cache lock isn't required, so lock wait is just on
1110        // this entry
1111        let mut slice = entry.value().write().await;
1112
1113        // if rb becomes update, it has been committed in read map already
1114        if !slice.is_update() {
1115            let off = top_e.get_value() + slice_off as u64;
1116            slice.set_offset(Some(off));
1117
1118            if !self.cluster_is_new(off >> info.cluster_bits()).await {
1119                let buf = unsafe {
1120                    std::slice::from_raw_parts_mut(slice.as_mut_ptr(), slice.byte_size())
1121                };
1122                self.call_read(off, buf).await?;
1123                log::trace!("add_cache_slice: load from disk");
1124            } else {
1125                entry.set_dirty(true);
1126                self.mark_need_flush(true);
1127                log::trace!("add_cache_slice: build from inflight");
1128            }
1129
1130            //commit all populated caches and make them visible
1131            Ok(cache.commit_wmap())
1132        } else {
1133            log::trace!("add_cache_slice: slice is already update");
1134            Ok(None)
1135        }
1136    }
1137
1138    #[inline]
1139    async fn add_rb_slice(
1140        &self,
1141        rt_e: &RefTableEntry,
1142        key: usize,
1143        slice_off: usize,
1144        slice: RefBlock,
1145    ) -> Qcow2Result<()> {
1146        match self
1147            .add_cache_slice(&self.refblock_cache, rt_e, key, slice_off, slice)
1148            .await?
1149        {
1150            Some(to_kill) => {
1151                log::warn!("add_rb_slice: cache eviction, slices {}", to_kill.len());
1152                self.flush_cache_entries(to_kill).await
1153            }
1154            _ => Ok(()),
1155        }
1156    }
1157
1158    async fn get_refblock(
1159        &self,
1160        cls: &HostCluster,
1161        rt_e: &RefTableEntry,
1162    ) -> Qcow2Result<AsyncLruCacheEntry<AsyncRwLock<RefBlock>>> {
1163        let info = &self.info;
1164        let key = cls.rb_slice_key(info);
1165        let rb_cache = &self.refblock_cache;
1166
1167        // fast path
1168        if let Some(entry) = rb_cache.get(key) {
1169            return Ok(entry);
1170        }
1171
1172        self.add_rb_slice(
1173            rt_e,
1174            key,
1175            cls.rb_slice_off_in_table(info),
1176            RefBlock::new(info.refcount_order, 1 << info.rb_slice_bits, None),
1177        )
1178        .await?;
1179
1180        if let Some(entry) = rb_cache.get(key) {
1181            Ok(entry)
1182        } else {
1183            Err("Fail to load refcount block".into())
1184        }
1185    }
1186
1187    /// make sure reftable entry points to valid refcount block
1188    async fn ensure_refblock_offset(&self, cls: &HostCluster) -> Qcow2Result<RefTableEntry> {
1189        let info = &self.info;
1190
1191        let rt_index = cls.rt_index(info);
1192        {
1193            let reftable = self.reftable.read().await;
1194            let rt_entry = reftable.get(rt_index);
1195            if !rt_entry.is_zero() {
1196                return Ok(rt_entry);
1197            }
1198        }
1199
1200        let rt_clusters = {
1201            let h = self.header.read().await;
1202            h.reftable_clusters()
1203        };
1204
1205        let mut reftable = self.reftable.write().await;
1206        log::info!(
1207            "ensure rt entry: rt_idx {} rt_entries {} host_cluster {:x}",
1208            rt_index,
1209            reftable.entries(),
1210            cls.0
1211        );
1212        if !reftable.in_bounds(rt_index) {
1213            let mut grown_rt = reftable.clone_and_grow(rt_index, rt_clusters, info.cluster_size());
1214            if !grown_rt.is_update() {
1215                self.grow_reftable(&reftable, &mut grown_rt).await?;
1216            }
1217            *reftable = grown_rt;
1218        }
1219
1220        // Retry before allocating, maybe something has changed in the meantime
1221        let rt_entry = reftable.get(rt_index);
1222        if !rt_entry.is_zero() {
1223            return Ok(rt_entry);
1224        }
1225
1226        // always run background flushing
1227        let refblock_offset = (rt_index as u64) << (info.rb_index_shift + info.cluster_shift);
1228        reftable.set_refblock_offset(rt_index, refblock_offset);
1229        self.mark_need_flush(true);
1230
1231        // `new` flag means two points:
1232        //
1233        // - the pointed refblock needn't to be loaded from disk, and can be built from
1234        // inflight
1235        //
1236        // - when flushing refblock slice, the pointed cluster for storing the refblock
1237        // need to be discarded first
1238        self.mark_new_cluster(refblock_offset >> info.cluster_bits())
1239            .await;
1240
1241        let rt_e = reftable.get(rt_index);
1242
1243        log::debug!("allocate new refblock offset {:x}", refblock_offset);
1244        let mut new_refblock = RefBlock::new(info.refcount_order(), 1 << info.rb_slice_bits, None);
1245        new_refblock.increment(0).unwrap();
1246
1247        //add this refblock into cache directly
1248        //
1249        //We have to add slice here because the reference needs to be held
1250        self.add_rb_slice(
1251            &rt_e,
1252            cls.rb_slice_key(info),
1253            cls.rb_slice_off_in_table(info),
1254            new_refblock,
1255        )
1256        .await?;
1257
1258        log::debug!("ensure_refblock: done");
1259
1260        Ok(rt_e)
1261    }
1262
1263    // `fixed_start` means we can't change the specified allocation position
1264    // and count; otherwise, we may return the tail free clusters of this
1265    // slice.
1266    async fn try_alloc_from_rb_slice(
1267        &self,
1268        rt_e: &RefTableEntry,
1269        cls: &HostCluster,
1270        count: usize,
1271        fixed_start: bool,
1272    ) -> Qcow2Result<Option<(u64, usize)>> {
1273        let info = &self.info;
1274        let slice_entries = info.rb_slice_entries() as usize;
1275        let rb_slice_index = cls.rb_slice_index(info);
1276        if rb_slice_index + count > slice_entries {
1277            // Do not cross refblock boundaries; caller should look into the next refblock
1278            return Ok(None);
1279        }
1280
1281        let rb_handle = self.get_refblock(cls, rt_e).await?;
1282        let mut rb = rb_handle.value().write().await;
1283
1284        match rb.get_free_range(rb_slice_index, count) {
1285            Some(r) => {
1286                rb.alloc_range(r.start, r.end)?;
1287                rb_handle.set_dirty(true);
1288                self.mark_need_flush(true);
1289                Ok(Some((cls.cluster_off_from_slice(info, r.start), r.len())))
1290            }
1291            _ => {
1292                if fixed_start {
1293                    Ok(None)
1294                } else {
1295                    match rb.get_tail_free_range() {
1296                        Some(r) => {
1297                            rb.alloc_range(r.start, r.end)?;
1298                            rb_handle.set_dirty(true);
1299                            self.mark_need_flush(true);
1300                            Ok(Some((cls.cluster_off_from_slice(info, r.start), r.len())))
1301                        }
1302                        _ => Ok(None),
1303                    }
1304                }
1305            }
1306        }
1307    }
1308
1309    /// Try to allocate `count` clusters starting somewhere at or after `host_cluster`, but in the
1310    /// same refblock.  This function cannot cross refblock boundaries.
1311    async fn try_allocate_from(
1312        &self,
1313        mut host_cluster: u64,
1314        alloc_cnt: usize,
1315    ) -> Qcow2Result<Option<(u64, usize)>> {
1316        assert!(alloc_cnt > 0);
1317
1318        let info = &self.info;
1319        let cls = HostCluster(host_cluster);
1320        let rt_entry = self.ensure_refblock_offset(&cls).await?;
1321        let mut out_off = 0;
1322        let mut done = 0;
1323        let mut count = alloc_cnt;
1324
1325        // run cross-refblock-slice allocation, and we are allowed to return clusters
1326        // less than requested
1327        while count > 0 && host_cluster < cls.rb_host_end(info) {
1328            let cls = HostCluster(host_cluster);
1329            let curr_cnt = std::cmp::min(count, info.rb_slice_entries() as usize);
1330
1331            // run non-fixed allocation for the 1st one, and other allocation has
1332            // to be continuous for keeping the whole range continuous
1333            match self
1334                .try_alloc_from_rb_slice(&rt_entry, &cls, curr_cnt, done != 0)
1335                .await?
1336            {
1337                Some(off) => {
1338                    //println!("alloc: done {} off {} cnt {}", done, off.0, off.1);
1339                    if done == 0 {
1340                        out_off = off.0;
1341                    } else {
1342                        // can't make a big & continuous ranges, skip the small part
1343                        // in previous loop, and retry from current host_cluster
1344                        if host_cluster != off.0 {
1345                            log::debug!(
1346                                "try_allocate_from: fragment found and retry, free ({:x} {}) ({:x} {})",
1347                                out_off,
1348                                done,
1349                                off.0,
1350                                off.1
1351                            );
1352                            self.free_clusters(out_off, done).await?;
1353                            self.free_clusters(off.0, off.1).await?;
1354                            done = 0;
1355                            count = alloc_cnt;
1356                            continue;
1357                        }
1358                    }
1359                    host_cluster = off.0 + ((off.1 as u64) << info.cluster_bits());
1360                    count -= off.1;
1361                    done += off.1;
1362                }
1363                None => {
1364                    // not started, so try next slice since nothing is available from
1365                    // current slice; otherwise return anything allocated
1366                    if done == 0 {
1367                        host_cluster = cls.rb_slice_host_end(info);
1368                    } else {
1369                        break;
1370                    }
1371                }
1372            }
1373        }
1374
1375        if done != 0 {
1376            Ok(Some((out_off, done)))
1377        } else {
1378            Ok(None)
1379        }
1380    }
1381
1382    /// Allocate clusters, so far the allocation can't cross each refblock boundary.
1383    /// But may return clusters less than requested.
1384    async fn allocate_clusters(&self, count: usize) -> Qcow2Result<Option<(u64, usize)>> {
1385        let info = &self.info;
1386        let mut host_offset = self.free_cluster_offset.load(Ordering::Relaxed);
1387
1388        loop {
1389            match self.try_allocate_from(host_offset, count).await? {
1390                Some(a) => {
1391                    if count == 1 {
1392                        // Update the free cluster index only for `count == 1`, because otherwise
1393                        // (`count > 1`) we might have the index skip holes where single clusters
1394                        // could still fit
1395                        self.free_cluster_offset
1396                            .fetch_max(a.0 + info.cluster_size() as u64, Ordering::Relaxed);
1397                    }
1398
1399                    log::debug!(
1400                        "allocate_clusters: requested {:x}/{} allocated {:x} {:x}/{}",
1401                        count,
1402                        count,
1403                        a.0,
1404                        a.1,
1405                        a.1
1406                    );
1407
1408                    return Ok(Some(a));
1409                }
1410
1411                None => {
1412                    host_offset = HostCluster(host_offset).rb_host_end(info);
1413                }
1414            }
1415        }
1416    }
1417
1418    async fn allocate_cluster(&self) -> Qcow2Result<Option<(u64, usize)>> {
1419        self.allocate_clusters(1).await
1420    }
1421
1422    async fn count_rb_slice_alloc_clusters(
1423        &self,
1424        rt_e: &RefTableEntry,
1425        cls: &HostCluster,
1426    ) -> Qcow2Result<usize> {
1427        let rb_h = self.get_refblock(cls, rt_e).await?;
1428        let rb = rb_h.value().write().await;
1429        let mut total = 0;
1430
1431        for i in 0..rb.entries() {
1432            if !rb.get(i).is_zero() {
1433                total += 1;
1434            }
1435        }
1436
1437        Ok(total)
1438    }
1439
1440    async fn count_rt_entry_alloc_clusters(&self, cls: &HostCluster) -> Qcow2Result<Option<usize>> {
1441        let info = &self.info;
1442        let rt = self.reftable.read().await;
1443        let rt_idx = cls.rt_index(info);
1444        let rt_e = rt.get(rt_idx);
1445        let mut offset = cls.0;
1446        let mut total = 0;
1447
1448        if rt_e.is_zero() {
1449            return Ok(None);
1450        }
1451
1452        for _ in 0..(1 << (info.cluster_bits() - info.rb_slice_bits as usize)) {
1453            let cls = HostCluster(offset);
1454            let cnt = self.count_rb_slice_alloc_clusters(&rt_e, &cls).await?;
1455
1456            total += cnt;
1457            offset += (info.rb_slice_entries() as u64) << info.cluster_bits();
1458        }
1459
1460        Ok(Some(total))
1461    }
1462
1463    #[allow(dead_code)]
1464    pub(crate) async fn count_alloc_clusters(&self) -> Qcow2Result<usize> {
1465        let info = &self.info;
1466        let mut offset = 0_u64;
1467        let mut total = 0;
1468
1469        loop {
1470            let cls = HostCluster(offset);
1471
1472            match self.count_rt_entry_alloc_clusters(&cls).await? {
1473                Some(res) => total += res,
1474                None => break,
1475            }
1476
1477            offset += (info.rb_entries() as u64) << info.cluster_bits();
1478        }
1479
1480        Ok(total)
1481    }
1482
1483    pub async fn get_mapping(&self, virtual_offset: u64) -> Qcow2Result<Mapping> {
1484        let split = SplitGuestOffset(virtual_offset & !(self.info.in_cluster_offset_mask as u64));
1485        let entry = self.get_l2_entry(virtual_offset).await?;
1486
1487        Ok(entry.into_mapping(&self.info, &split))
1488    }
1489
1490    #[inline]
1491    async fn get_l2_entry(&self, virtual_offset: u64) -> Qcow2Result<L2Entry> {
1492        let info = &self.info;
1493        let split = SplitGuestOffset(virtual_offset);
1494        let key = split.l2_slice_key(info);
1495
1496        // fast path
1497        if let Some(res) = self.l2cache.get(key) {
1498            let l2_slice = res.value().read().await;
1499            Ok(l2_slice.get_entry(info, &split))
1500        } else {
1501            let l1_entry = self.get_l1_entry(&split).await?;
1502
1503            if l1_entry.is_zero() {
1504                Ok(L2Entry(0))
1505            } else {
1506                let entry = self.get_l2_slice_slow(&l1_entry, &split).await?;
1507                let l2_slice = entry.value().read().await;
1508                Ok(l2_slice.get_entry(info, &split))
1509            }
1510        }
1511    }
1512
1513    #[inline]
1514    async fn get_l2_entries(&self, off: u64, len: usize) -> Qcow2Result<Vec<L2Entry>> {
1515        let info = &self.info;
1516        let start = off & (!(info.in_cluster_offset_mask) as u64);
1517        let end = (off + ((len + info.cluster_size()) as u64) - 1)
1518            & (!(info.in_cluster_offset_mask) as u64);
1519        let mut entries = Vec::new();
1520        let mut voff = start;
1521
1522        while voff < end {
1523            let split = SplitGuestOffset(voff);
1524            let key = split.l2_slice_key(info);
1525
1526            // fast path
1527            let l2_slice = match self.l2cache.get(key) {
1528                Some(res) => res.value().read().await,
1529                None => {
1530                    let l1_entry = self.get_l1_entry(&split).await?;
1531
1532                    if l1_entry.is_zero() {
1533                        entries.push(L2Entry(0));
1534                        voff += info.cluster_size() as u64;
1535                        continue;
1536                    } else {
1537                        let entry = self.get_l2_slice_slow(&l1_entry, &split).await?;
1538                        entry.value().read().await
1539                    }
1540                }
1541            };
1542
1543            let this_end = {
1544                let l2_slice_idx = split.l2_slice_index(info) as u32;
1545                std::cmp::min(
1546                    end,
1547                    voff + (((info.l2_slice_entries - l2_slice_idx) as u64) << info.cluster_bits()),
1548                )
1549            };
1550
1551            for this_off in (voff..this_end).step_by(info.cluster_size()) {
1552                let s = SplitGuestOffset(this_off);
1553                entries.push(l2_slice.get_entry(info, &s));
1554            }
1555            voff = this_end;
1556        }
1557
1558        Ok(entries)
1559    }
1560
1561    async fn do_read_compressed(
1562        &self,
1563        mapping: Mapping,
1564        off_in_cls: usize,
1565        buf: &mut [u8],
1566    ) -> Qcow2Result<usize> {
1567        let info = &self.info;
1568        let compressed_offset = mapping.cluster_offset.unwrap();
1569        let compressed_length = mapping.compressed_length.unwrap();
1570
1571        // for supporting dio, we have to run aligned IO
1572        let bs = 1 << info.block_size_shift;
1573        let bs_mask = !((1_u64 << info.block_size_shift) - 1);
1574        let aligned_off = compressed_offset & bs_mask;
1575        let pad = (compressed_offset - aligned_off) as usize;
1576        let aligned_len = (pad + compressed_length + bs - 1) & (bs_mask as usize);
1577
1578        let mut _compressed_data = Qcow2IoBuf::<u8>::new(aligned_len);
1579        let res = self.call_read(aligned_off, &mut _compressed_data).await?;
1580        if res != aligned_len {
1581            return Err("do_read_compressed: short read compressed data".into());
1582        }
1583        let compressed_data = &_compressed_data[pad..(pad + compressed_length)];
1584
1585        let mut dec_ox = DecompressorOxide::new();
1586        if buf.len() == info.cluster_size() {
1587            let (status, _read, _written) = inflate(&mut dec_ox, compressed_data, buf, 0, 0);
1588            if status != TINFLStatus::Done && status != TINFLStatus::HasMoreOutput {
1589                return Err(format!(
1590                    "Failed to decompress cluster (host offset 0x{:x}+{}): {:?}",
1591                    compressed_offset, compressed_length, status
1592                )
1593                .into());
1594            }
1595        } else {
1596            let mut uncompressed_data = vec![0; info.cluster_size()];
1597
1598            let (status, _read, _written) =
1599                inflate(&mut dec_ox, compressed_data, &mut uncompressed_data, 0, 0);
1600            if status != TINFLStatus::Done && status != TINFLStatus::HasMoreOutput {
1601                return Err(format!(
1602                    "Failed to decompress cluster (host offset 0x{:x}+{}): {:?}",
1603                    compressed_offset, compressed_length, status
1604                )
1605                .into());
1606            }
1607            buf.copy_from_slice(&uncompressed_data[off_in_cls..(off_in_cls + buf.len())]);
1608        };
1609
1610        Ok(buf.len())
1611    }
1612
1613    #[inline]
1614    async fn do_read_backing(
1615        &self,
1616        mapping: Mapping,
1617        off_in_cls: usize,
1618        buf: &mut [u8],
1619    ) -> Qcow2Result<usize> {
1620        match self.backing_file.as_ref() {
1621            Some(backing) => match mapping.cluster_offset {
1622                Some(off) => {
1623                    backing
1624                        .read_at_for_backing(buf, off + off_in_cls as u64)
1625                        .await
1626                }
1627                None => Err("Backing mapping: None offset None".into()),
1628            },
1629            None => {
1630                zero_buf!(buf);
1631                Ok(buf.len())
1632            }
1633        }
1634    }
1635
1636    #[inline]
1637    async fn do_read_zero(
1638        &self,
1639        _mapping: Mapping,
1640        _off_in_cls: usize,
1641        buf: &mut [u8],
1642    ) -> Qcow2Result<usize> {
1643        zero_buf!(buf);
1644        Ok(buf.len())
1645    }
1646
1647    #[inline]
1648    async fn do_read_data_file(
1649        &self,
1650        mapping: Mapping,
1651        off_in_cls: usize,
1652        buf: &mut [u8],
1653    ) -> Qcow2Result<usize> {
1654        match mapping.cluster_offset {
1655            Some(off) => self.call_read(off + off_in_cls as u64, buf).await,
1656            None => Err("DataFile mapping: None offset None".into()),
1657        }
1658    }
1659
1660    #[inline]
1661    async fn do_read(&self, entry: L2Entry, offset: u64, buf: &mut [u8]) -> Qcow2Result<usize> {
1662        let off_in_cls = (offset as usize) & self.info.in_cluster_offset_mask;
1663        let split = SplitGuestOffset(offset - (off_in_cls as u64));
1664        let mapping = entry.into_mapping(&self.info, &split);
1665
1666        log::trace!(
1667            "do_read: {} off_in_cls {} len {}",
1668            mapping,
1669            off_in_cls,
1670            buf.len()
1671        );
1672        match mapping.source {
1673            MappingSource::DataFile => self.do_read_data_file(mapping, off_in_cls, buf).await,
1674            MappingSource::Zero | MappingSource::Unallocated => {
1675                let mapping = self.get_mapping(offset).await?;
1676                self.do_read_zero(mapping, off_in_cls, buf).await
1677            }
1678            MappingSource::Backing => {
1679                let mapping = self.get_mapping(offset).await?;
1680                self.do_read_backing(mapping, off_in_cls, buf).await
1681            }
1682            MappingSource::Compressed => self.do_read_compressed(mapping, off_in_cls, buf).await,
1683        }
1684    }
1685
1686    #[inline]
1687    async fn __read_at(&self, buf: &mut [u8], mut offset: u64) -> Qcow2Result<usize> {
1688        let info = &self.info;
1689        let bs = 1 << info.block_size_shift;
1690        let bs_mask = bs - 1;
1691        let vsize = info.virtual_size();
1692        let mut len = buf.len();
1693        let old_offset = offset;
1694        let old_len = len;
1695        let single =
1696            (offset >> info.cluster_bits()) == ((offset + (len as u64) - 1) >> info.cluster_bits());
1697
1698        if offset >= vsize {
1699            if !info.is_back_file() {
1700                return Err("read_at eof".into());
1701            } else {
1702                // the top device is asking for read, which is usually
1703                // caused by top device resize, so simply fake we provide
1704                // data requested
1705                return Ok(buf.len());
1706            }
1707        }
1708
1709        if len == 0 {
1710            return Ok(0);
1711        }
1712
1713        if (len & bs_mask) != 0 {
1714            return Err("un-aligned buffer length".into());
1715        }
1716
1717        if (offset & (bs_mask as u64)) != 0 {
1718            return Err("un-aligned offset".into());
1719        }
1720
1721        log::debug!("read_at: offset {:x} len {} >>>", offset, buf.len());
1722
1723        let extra = if offset + (len as u64) > vsize {
1724            len = ((offset + (len as u64) - vsize + bs as u64 - 1) as usize) & !bs_mask;
1725            if info.is_back_file() {
1726                buf.len() - len
1727            } else {
1728                0
1729            }
1730        } else {
1731            0
1732        };
1733
1734        debug_assert!((len & bs_mask) == 0);
1735
1736        let done = if single {
1737            let l2_entry = self.get_l2_entry(offset).await?;
1738
1739            self.do_read(l2_entry, offset, buf).await?
1740        } else {
1741            let mut reads = Vec::new();
1742            let mut remain = buf;
1743            let mut first_len = 0;
1744            let mut last_len = 0;
1745            let mut idx = 0;
1746            let mut s = 0;
1747            let l2_entries = self.get_l2_entries(offset, len).await?;
1748
1749            while len > 0 {
1750                let in_cluster_offset = offset as usize & info.in_cluster_offset_mask;
1751                let curr_len = std::cmp::min(info.cluster_size() - in_cluster_offset, len);
1752                let (iobuf, b) = remain.split_at_mut(curr_len);
1753                remain = b;
1754
1755                if first_len == 0 {
1756                    first_len = curr_len;
1757                }
1758
1759                reads.push(self.do_read(l2_entries[idx], offset, iobuf));
1760
1761                offset += curr_len as u64;
1762                len -= curr_len;
1763                if len == 0 {
1764                    last_len = curr_len;
1765                }
1766                idx += 1;
1767            }
1768
1769            let res = futures::future::join_all(reads).await;
1770            for i in 0..res.len() {
1771                let exp = if i == 0 {
1772                    first_len
1773                } else if i - 1 == res.len() {
1774                    last_len
1775                } else {
1776                    info.cluster_size()
1777                };
1778
1779                match res[i] {
1780                    Ok(r) => {
1781                        s += r;
1782                        if r != exp {
1783                            break;
1784                        }
1785                    }
1786                    Err(_) => break,
1787                };
1788            }
1789            s
1790        };
1791
1792        log::debug!(
1793            "read_at: offset {:x} len {} res {} <<<",
1794            old_offset,
1795            old_len,
1796            done
1797        );
1798        Ok(done + extra)
1799    }
1800
1801    #[async_recursion(?Send)]
1802    async fn read_at_for_backing(&self, buf: &mut [u8], offset: u64) -> Qcow2Result<usize> {
1803        self.__read_at(buf, offset).await
1804    }
1805
1806    /// Read data to `buf` from the virtual `offset` of this qcow2 image
1807    pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Qcow2Result<usize> {
1808        self.__read_at(buf, offset).await
1809    }
1810
1811    async fn flush_header_for_l1_table(
1812        &self,
1813        l1_offset: u64,
1814        l1_entries: usize,
1815    ) -> Qcow2Result<()> {
1816        let info = &self.info;
1817
1818        log::info!(
1819            "ensure_l2_offset: flush header for updating l1 offset {:x} entries {}",
1820            l1_offset,
1821            l1_entries
1822        );
1823
1824        assert!(info.in_cluster_offset(l1_offset) == 0);
1825        assert!(l1_entries <= info.max_l1_entries());
1826
1827        let mut h = self.header.write().await;
1828        let old_entries = h.l1_table_entries();
1829        let old_offset = h.l1_table_offset();
1830
1831        h.set_l1_table(l1_offset, l1_entries)?;
1832        let buf = h.serialize_to_buf()?;
1833        if let Err(err) = self.call_write(0, &buf).await {
1834            h.set_l1_table(old_offset, old_entries).unwrap();
1835            return Err(err);
1836        }
1837        Ok(())
1838    }
1839
1840    /// for fill up l1 entry
1841    async fn ensure_l2_offset(&self, split: &SplitGuestOffset) -> Qcow2Result<L1Entry> {
1842        let info = &self.info;
1843        let l1_entry = self.get_l1_entry(split).await?;
1844        if !l1_entry.is_zero() {
1845            return Ok(l1_entry);
1846        }
1847
1848        let l1_index = split.l1_index(info);
1849        let mut l1_table = self.l1table.write().await;
1850
1851        // check if the current index is in bound of header l1 entries
1852        if !l1_table.in_bounds(l1_index) {
1853            if l1_index >= l1_table.entries() {
1854                let old_l1_offset = l1_table.get_offset().unwrap();
1855                let old_l1_clusters = l1_table.cluster_count(info);
1856
1857                let mut new_l1_table = l1_table.clone_and_grow(l1_index, info.cluster_size());
1858                let new_l1_clusters = new_l1_table.cluster_count(info);
1859                let allocated = self.allocate_clusters(new_l1_clusters).await.unwrap();
1860
1861                // fixme: allocated may return less clusters, here has to cover this
1862                // case
1863                match allocated {
1864                    None => return Err("nothing allocated for new l1 table".into()),
1865                    Some(res) => {
1866                        log::info!("ensure_l2_offset: write new allocated l1 table");
1867                        self.flush_refcount().await?;
1868                        self.flush_mapping(&l1_table).await?;
1869                        new_l1_table.set_offset(Some(res.0));
1870                        self.flush_top_table(&new_l1_table).await?;
1871
1872                        self.flush_header_for_l1_table(res.0, new_l1_table.entries())
1873                            .await?;
1874                    }
1875                };
1876
1877                *l1_table = new_l1_table;
1878                self.free_clusters(old_l1_offset, old_l1_clusters).await?;
1879            } else {
1880                let l1_off = {
1881                    let h = self.header.read().await;
1882                    h.l1_table_offset()
1883                };
1884                let l1_entries = std::cmp::min(info.max_l1_entries(), l1_table.entries());
1885
1886                // update l1 entries
1887                self.flush_header_for_l1_table(l1_off, l1_entries).await?;
1888                l1_table.update_header_entries(l1_entries.try_into().unwrap());
1889            }
1890        }
1891
1892        // Retry before allocating, maybe something has changed in the meantime
1893        let l1_e = l1_table.get(l1_index);
1894        if !l1_e.is_zero() {
1895            return Ok(l1_e);
1896        }
1897
1898        let allocated = self.allocate_cluster().await?;
1899        match allocated {
1900            Some(res) => {
1901                let l2_offset = res.0;
1902
1903                // this is one new cluster
1904                self.mark_new_cluster(l2_offset >> info.cluster_bits())
1905                    .await;
1906                l1_table.map_l2_offset(l1_index, l2_offset);
1907                self.mark_need_flush(true);
1908
1909                Ok(l1_table.get(l1_index))
1910            }
1911            None => Err("nothing allocated for l2 table".into()),
1912        }
1913    }
1914
1915    #[inline(always)]
1916    async fn do_compressed_cow(
1917        &self,
1918        off_in_cls: usize,
1919        buf: &[u8],
1920        host_off: u64,
1921        compressed_mapping: &Mapping,
1922    ) -> Qcow2Result<()> {
1923        let mut cbuf = Qcow2IoBuf::<u8>::new(self.info.cluster_size());
1924
1925        // copy & write
1926        self.do_read_compressed(compressed_mapping.clone(), 0, &mut cbuf)
1927            .await?;
1928        cbuf[off_in_cls..off_in_cls + buf.len()].copy_from_slice(buf);
1929        self.call_write(host_off, &cbuf).await
1930    }
1931
1932    #[inline(always)]
1933    async fn do_back_cow(
1934        &self,
1935        virt_off: u64,
1936        off_in_cls: usize,
1937        buf: &[u8],
1938        host_off: u64,
1939    ) -> Qcow2Result<()> {
1940        match self.backing_file.as_ref() {
1941            Some(backing) => {
1942                let mut cbuf = Qcow2IoBuf::<u8>::new(self.info.cluster_size());
1943
1944                // copy & write
1945                backing
1946                    .read_at(&mut cbuf, virt_off - (off_in_cls as u64))
1947                    .await?;
1948                cbuf[off_in_cls..off_in_cls + buf.len()].copy_from_slice(buf);
1949                self.call_write(host_off, &cbuf).await
1950            }
1951            None => Err("No backing device found for COW".into()),
1952        }
1953    }
1954
1955    /// discard this part iff the pointed host cluster is new
1956    #[inline]
1957    async fn do_write_data_file(
1958        &self,
1959        virt_off: u64,
1960        mapping: &Mapping,
1961        cow_mapping: Option<&Mapping>,
1962        buf: &[u8],
1963    ) -> Qcow2Result<()> {
1964        let info = &self.info;
1965        let off_in_cls = (virt_off & (info.in_cluster_offset_mask as u64)) as usize;
1966        let may_cow = cow_mapping.is_some();
1967
1968        let host_off = match mapping.cluster_offset {
1969            Some(off) => off,
1970            None => return Err("DataFile mapping: None offset None".into()),
1971        };
1972
1973        log::trace!(
1974            "do_write_data_file off_in_cls {:x} len {} virt_off {:x} cow {} mapping {}",
1975            off_in_cls,
1976            buf.len(),
1977            virt_off,
1978            may_cow,
1979            &mapping,
1980        );
1981
1982        let f_write = self.call_write(host_off + off_in_cls as u64, buf);
1983        let key = host_off >> info.cluster_bits();
1984
1985        let mut discard = None;
1986        let cluster_lock = if self.cluster_is_new(key).await {
1987            let cls_map = self.new_cluster.read().await;
1988            // keep this cluster locked, so that concurrent discard can
1989            // be avoided
1990
1991            if cls_map.contains_key(&key) {
1992                let mut lock = cls_map.get(&key).unwrap().write().await;
1993
1994                // don't handle discard any more if someone else has done
1995                // that, otherwise mark this cluster is being handled.
1996                //
1997                // use this per-cluster lock for covering backign COW too,
1998                // the whole cluster is copied to top image with this write
1999                // lock covered, so any concurrent write has to be started
2000                // after the copy is done
2001                if !(*lock) {
2002                    *lock = true;
2003
2004                    discard = Some(self.call_fallocate(host_off, info.cluster_size(), 0));
2005                    Some(lock)
2006                } else {
2007                    None
2008                }
2009            } else {
2010                None
2011            }
2012        } else {
2013            None
2014        };
2015
2016        if let Some(lock) = cluster_lock {
2017            if let Some(df) = discard {
2018                df.await?
2019            }
2020
2021            let cow_res = match cow_mapping {
2022                None => Ok(()),
2023                Some(m) => match m.source {
2024                    MappingSource::Compressed => {
2025                        self.do_compressed_cow(off_in_cls, buf, host_off, m).await
2026                    }
2027                    MappingSource::Backing => {
2028                        self.do_back_cow(virt_off, off_in_cls, buf, host_off).await
2029                    }
2030                    _ => Ok(()),
2031                },
2032            };
2033
2034            /*
2035             * Another write on this new cluster may hold the read lock
2036             * and we won't move on, so drop write lock first given
2037             * we have marked that this new cluster is being discarded.
2038             */
2039            drop(lock);
2040            self.clear_new_cluster(key).await;
2041            if may_cow {
2042                // make sure data flushed before updating mapping
2043                self.call_fsync(host_off, info.cluster_size(), 0).await?;
2044                return cow_res;
2045            }
2046        };
2047
2048        f_write.await
2049    }
2050
2051    async fn do_write_cow(&self, off: u64, mapping: &Mapping, buf: &[u8]) -> Qcow2Result<()> {
2052        let info = &self.info;
2053        let split = SplitGuestOffset(off);
2054        let compressed = mapping.source == MappingSource::Compressed;
2055
2056        log::trace!(
2057            "do_write_cow off_in_cls {:x} len {} mapping {}",
2058            off,
2059            buf.len(),
2060            &mapping,
2061        );
2062
2063        // compressed image does have l1 ready, but backing dev may not
2064        if !compressed {
2065            let _ = self.ensure_l2_offset(&split).await?;
2066        }
2067        let l2_handle = self.get_l2_slice(&split).await?;
2068
2069        // hold l2_table write lock, so that new mapping won't be flushed
2070        // to disk until cow is done
2071        let mut l2_table = l2_handle.value().write().await;
2072
2073        // someone may jump on this cluster at the same time,
2074        // just let _one_ of them to handle COW for compressed image
2075        let data_mapping = match l2_table.get_mapping(info, &split).source {
2076            MappingSource::Compressed | MappingSource::Backing => {
2077                let mapping = self.alloc_and_map_cluster(&split, &mut l2_table).await?;
2078
2079                l2_handle.set_dirty(true);
2080                self.mark_need_flush(true);
2081
2082                mapping
2083            }
2084            _ => {
2085                drop(l2_table);
2086                return self.write_at_for_cow(buf, off).await;
2087            }
2088        };
2089
2090        match self
2091            .do_write_data_file(off, &data_mapping, Some(mapping), buf)
2092            .await
2093        {
2094            Err(e) => {
2095                log::error!("do_write_cow: data write failed");
2096                // recover to previous compressed mapping & free allocated
2097                // clusters
2098                let allocated_cls = data_mapping.cluster_offset.unwrap();
2099                self.free_clusters(allocated_cls, 1).await?;
2100                self.clear_new_cluster(allocated_cls >> info.cluster_bits())
2101                    .await;
2102
2103                l2_table.set(
2104                    split.l2_slice_index(info),
2105                    crate::meta::L2Entry::from_mapping(mapping.clone(), info.cluster_bits() as u32),
2106                );
2107
2108                Err(e)
2109            }
2110            Ok(_) => {
2111                // respect meta update order, flush refcount meta,
2112                // then flush this l2 table, then decrease the
2113                // old cluster's reference count in ram
2114
2115                // flush refcount change, which is often small
2116                // change
2117                self.flush_refcount().await?;
2118
2119                // flush mapping table in-place update
2120                let off = l2_table.get_offset().unwrap();
2121                let buf =
2122                    unsafe { std::slice::from_raw_parts(l2_table.as_ptr(), l2_table.byte_size()) };
2123                self.call_write(off, buf).await?;
2124                l2_handle.set_dirty(false);
2125
2126                // release l2 table, so that this new mapping can be flushed
2127                // to disk
2128                drop(l2_table);
2129
2130                if compressed {
2131                    // free clusters in original compressed mapping
2132                    // finally, this update needn't be flushed immediately,
2133                    // and can be update in ram
2134                    let l2_e = crate::meta::L2Entry::from_mapping(
2135                        mapping.clone(),
2136                        info.cluster_bits() as u32,
2137                    );
2138                    match l2_e.compressed_range(info.cluster_bits() as u32) {
2139                        Some((off, length)) => {
2140                            let mask = (!info.in_cluster_offset_mask) as u64;
2141                            let start = off & mask;
2142                            let end = (off + (length as u64)) & mask;
2143
2144                            let cnt = (((end - start) as usize) >> info.cluster_bits()) + 1;
2145                            self.free_clusters(start, cnt).await?
2146                        }
2147                        None => {
2148                            eprintln!("compressed clusters leak caused by wrong mapping")
2149                        }
2150                    }
2151                }
2152
2153                Ok(())
2154            }
2155        }
2156    }
2157
2158    #[inline]
2159    async fn alloc_and_map_cluster(
2160        &self,
2161        split: &SplitGuestOffset,
2162        l2_table: &mut LockWriteGuard<L2Table>,
2163    ) -> Qcow2Result<Mapping> {
2164        let info = &self.info;
2165        let allocated = self.allocate_cluster().await?;
2166        match allocated {
2167            Some(res) => {
2168                let l2_offset = res.0;
2169
2170                // this is one new cluster
2171                self.mark_new_cluster(l2_offset >> info.cluster_bits())
2172                    .await;
2173
2174                let _ = l2_table.map_cluster(split.l2_slice_index(info), l2_offset);
2175                Ok(l2_table.get_mapping(info, split))
2176            }
2177            None => Err("DataFile mapping: None offset None".into()),
2178        }
2179    }
2180
2181    #[inline]
2182    async fn make_single_write_mapping(&self, virt_off: u64) -> Qcow2Result<L2Entry> {
2183        let split = SplitGuestOffset(virt_off);
2184        let _ = self.ensure_l2_offset(&split).await?;
2185        let l2_handle = self.get_l2_slice(&split).await?;
2186        let mut l2_table = l2_handle.value().write().await;
2187
2188        let mapping = l2_table.get_mapping(&self.info, &split);
2189        if mapping.plain_offset(0).is_none() {
2190            let _ = self.alloc_and_map_cluster(&split, &mut l2_table).await?;
2191            l2_handle.set_dirty(true);
2192            self.mark_need_flush(true);
2193        }
2194        Ok(l2_table.get_entry(&self.info, &split))
2195    }
2196
2197    /// don't pre-populate mapping for backing & compressed cow, which
2198    /// have to update mapping until copy on write is completed, otherwise
2199    /// data loss may be caused.
2200    fn need_make_mapping(mapping: &Mapping, info: &Qcow2Info) -> bool {
2201        if mapping.plain_offset(0).is_some() {
2202            return false;
2203        }
2204
2205        if mapping.source == MappingSource::Compressed {
2206            return false;
2207        }
2208
2209        if info.has_back_file()
2210            && (mapping.source == MappingSource::Backing
2211                || mapping.source == MappingSource::Unallocated)
2212        {
2213            return false;
2214        }
2215
2216        true
2217    }
2218
2219    /// return how many l2 entries stored in `l2_entries`
2220    #[inline]
2221    async fn __make_multiple_write_mapping(
2222        &self,
2223        start: u64,
2224        end: u64,
2225        l2_entries: &mut Vec<L2Entry>,
2226    ) -> Qcow2Result<usize> {
2227        let info = &self.info;
2228        let cls_size = info.cluster_size() as u64;
2229
2230        debug_assert!((start & (cls_size - 1)) == 0);
2231
2232        let split = SplitGuestOffset(start);
2233        let _ = self.ensure_l2_offset(&split).await?;
2234        let l2_handle = self.get_l2_slice(&split).await?;
2235        let mut l2_table = l2_handle.value().write().await;
2236
2237        // each time, just handle one l2 slice, so the write lock
2238        // is just required once
2239        let end = {
2240            let l2_slice_idx = split.l2_slice_index(info) as u32;
2241            std::cmp::min(
2242                end,
2243                start + (((info.l2_slice_entries - l2_slice_idx) as u64) << info.cluster_bits()),
2244            )
2245        };
2246
2247        // figure out how many clusters to allocate for write
2248        let mut nr_clusters = 0;
2249        for this_off in (start..end).step_by(cls_size as usize) {
2250            let s = SplitGuestOffset(this_off);
2251            let mapping = l2_table.get_mapping(&self.info, &s);
2252
2253            if Self::need_make_mapping(&mapping, info) {
2254                nr_clusters += 1
2255            }
2256        }
2257
2258        if nr_clusters == 0 {
2259            for this_off in (start..end).step_by(cls_size as usize) {
2260                let s = SplitGuestOffset(this_off);
2261                let entry = l2_table.get_entry(info, &s);
2262                l2_entries.push(entry);
2263            }
2264            return Ok(((end - start) as usize) >> info.cluster_bits());
2265        }
2266
2267        let (cluster_start, cluster_cnt) = match self.allocate_clusters(nr_clusters).await? {
2268            Some((s, c)) => (s, c),
2269            _ => self
2270                .allocate_cluster()
2271                .await
2272                .unwrap()
2273                .expect("running out of cluster"),
2274        };
2275
2276        let mut this_off = start;
2277        let done = if cluster_cnt > 0 {
2278            // how many mappings are updated
2279            let mut idx = 0;
2280
2281            while this_off < end {
2282                let split = SplitGuestOffset(this_off);
2283                let entry = l2_table.get_entry(info, &split);
2284                let mapping = entry.into_mapping(info, &split);
2285
2286                if Self::need_make_mapping(&mapping, info) {
2287                    let l2_off = cluster_start + ((idx as u64) << info.cluster_bits());
2288
2289                    // this is one new cluster
2290                    self.mark_new_cluster(l2_off >> info.cluster_bits()).await;
2291                    let _ = l2_table.map_cluster(split.l2_slice_index(info), l2_off);
2292
2293                    //load new entry
2294                    let entry = l2_table.get_entry(info, &split);
2295                    l2_entries.push(entry);
2296                    idx += 1;
2297                } else {
2298                    l2_entries.push(entry)
2299                }
2300
2301                this_off += cls_size;
2302                if idx >= cluster_cnt {
2303                    break;
2304                }
2305            }
2306            idx
2307        } else {
2308            0
2309        };
2310
2311        if done > 0 {
2312            l2_handle.set_dirty(true);
2313            self.mark_need_flush(true);
2314        }
2315
2316        Ok(((this_off - start) as usize) >> info.cluster_bits())
2317    }
2318
2319    async fn make_multiple_write_mappings(
2320        &self,
2321        mut start: u64,
2322        end: u64,
2323    ) -> Qcow2Result<Vec<L2Entry>> {
2324        let info = &self.info;
2325        let mut l2_entries = Vec::new();
2326        while start < end {
2327            // optimize in future by getting l2 entries at batch
2328            let entry = self.get_l2_entry(start).await?;
2329
2330            let split = SplitGuestOffset(start);
2331            let mapping = entry.into_mapping(info, &split);
2332
2333            let done = if Self::need_make_mapping(&mapping, info) {
2334                self.__make_multiple_write_mapping(start, end, &mut l2_entries)
2335                    .await?
2336            } else {
2337                l2_entries.push(entry);
2338                1
2339            };
2340
2341            start += (done as u64) << info.cluster_bits();
2342        }
2343        Ok(l2_entries)
2344    }
2345
2346    async fn populate_single_write_mapping(&self, virt_off: u64) -> Qcow2Result<L2Entry> {
2347        let info = &self.info;
2348        let entry = self.get_l2_entry(virt_off).await?;
2349        let split = SplitGuestOffset(virt_off);
2350        let mapping = entry.into_mapping(info, &split);
2351
2352        let entry = if Self::need_make_mapping(&mapping, info) {
2353            self.make_single_write_mapping(virt_off).await?
2354        } else {
2355            entry
2356        };
2357
2358        Ok(entry)
2359    }
2360
2361    /// populate mapping for write at batch, and this way may improve
2362    /// perf a lot for big sequential IO, cause all meta setup can be
2363    /// one in single place, then data write IO can be run concurrently
2364    /// without lock contention
2365    #[inline]
2366    async fn populate_write_mappings(
2367        &self,
2368        virt_off: u64,
2369        len: usize,
2370    ) -> Qcow2Result<Vec<L2Entry>> {
2371        let info = &self.info;
2372        let cls_size = info.cluster_size() as u64;
2373        let start = virt_off & !(cls_size - 1);
2374        let end = (virt_off + (len as u64) + cls_size - 1) & !(cls_size - 1);
2375
2376        let entries = self.make_multiple_write_mappings(start, end).await?;
2377
2378        Ok(entries)
2379    }
2380
2381    async fn do_write(&self, l2_e: L2Entry, off: u64, buf: &[u8]) -> Qcow2Result<()> {
2382        let info = &self.info;
2383        let split = SplitGuestOffset(off & !(info.in_cluster_offset_mask as u64));
2384        let mapping = l2_e.into_mapping(info, &split);
2385
2386        log::trace!(
2387            "do_write: offset {:x} len {} mapping {}",
2388            off,
2389            buf.len(),
2390            &mapping,
2391        );
2392
2393        match mapping.source {
2394            MappingSource::DataFile => self.do_write_data_file(off, &mapping, None, buf).await,
2395            MappingSource::Compressed => self.do_write_cow(off, &mapping, buf).await,
2396            MappingSource::Backing | MappingSource::Unallocated if info.has_back_file() => {
2397                self.do_write_cow(off, &mapping, buf).await
2398            }
2399            _ => {
2400                eprintln!(
2401                    "invalid mapping {:?}, has_back_file {} offset {:x} len {}",
2402                    mapping.source,
2403                    info.has_back_file(),
2404                    off,
2405                    buf.len()
2406                );
2407                Err("invalid mapping built".into())
2408            }
2409        }
2410    }
2411
2412    #[inline]
2413    async fn __write_at(&self, buf: &[u8], mut offset: u64) -> Qcow2Result<()> {
2414        use futures::stream::{FuturesUnordered, StreamExt};
2415
2416        let info = &self.info;
2417        let bs = 1 << info.block_size_shift;
2418        let bs_mask = bs - 1;
2419        let mut len = buf.len();
2420        let old_offset = offset;
2421        let single =
2422            (offset >> info.cluster_bits()) == ((offset + (len as u64) - 1) >> info.cluster_bits());
2423
2424        log::debug!("write_at offset {:x} len {} >>>", offset, buf.len());
2425
2426        if offset
2427            .checked_add(buf.len() as u64)
2428            .map(|end| end > info.virtual_size())
2429            != Some(false)
2430        {
2431            return Err("Cannot write beyond the end of a qcow2 image".into());
2432        }
2433
2434        if (len & bs_mask) != 0 {
2435            return Err("write_at: un-aligned buffer length".into());
2436        }
2437
2438        if (offset & (bs_mask as u64)) != 0 {
2439            return Err("write_at: un-aligned offset".into());
2440        }
2441
2442        if info.is_read_only() {
2443            return Err("write_at: write to read-only image".into());
2444        }
2445
2446        if single {
2447            let l2_entry = self.populate_single_write_mapping(offset).await?;
2448            self.do_write(l2_entry, offset, buf).await?;
2449        } else {
2450            let writes = FuturesUnordered::new();
2451            let mut remain = buf;
2452            let mut idx = 0;
2453            let l2_entries = self.populate_write_mappings(offset, len).await?;
2454            while len > 0 {
2455                let in_cluster_offset = offset as usize & info.in_cluster_offset_mask;
2456                let curr_len = std::cmp::min(info.cluster_size() - in_cluster_offset, len);
2457                let (iobuf, b) = remain.split_at(curr_len);
2458                remain = b;
2459
2460                writes.push(self.do_write(l2_entries[idx], offset, iobuf));
2461
2462                offset += curr_len as u64;
2463                len -= curr_len;
2464                idx += 1;
2465            }
2466
2467            let res: Vec<_> = writes.collect().await;
2468            for r in res {
2469                if r.is_err() {
2470                    return Err("write_at: one write failed".into());
2471                }
2472            }
2473        }
2474
2475        log::debug!("write_at offset {:x} len {} <<<", old_offset, buf.len());
2476        Ok(())
2477    }
2478
2479    #[async_recursion(?Send)]
2480    async fn write_at_for_cow(&self, buf: &[u8], offset: u64) -> Qcow2Result<()> {
2481        self.__write_at(buf, offset).await
2482    }
2483
2484    /// Write data in `buf` to the virtual `offset` of this qcow2 image
2485    pub async fn write_at(&self, buf: &[u8], offset: u64) -> Qcow2Result<()> {
2486        self.__write_at(buf, offset).await
2487    }
2488
2489    #[inline(always)]
2490    fn mark_need_flush(&self, val: bool) {
2491        self.need_flush.store(val, Ordering::Relaxed);
2492    }
2493
2494    /// Helper for checking if there is dirty meta data which needs
2495    /// to be flushed to disk
2496    #[inline]
2497    pub fn need_flush_meta(&self) -> bool {
2498        self.need_flush.load(Ordering::Relaxed)
2499    }
2500
2501    fn add_used_cluster_to_set(ranges: &mut HashMap<u64, RangeInclusive<u64>>, num: u64) {
2502        let mut start = num;
2503        let mut end = num;
2504
2505        if num > 0 {
2506            if let Some(range) = ranges.remove(&(num - 1)) {
2507                start = *range.start();
2508                ranges.remove(&start);
2509            }
2510        }
2511
2512        if let Some(range) = ranges.remove(&(num + 1)) {
2513            end = *range.end();
2514            ranges.remove(&end);
2515        }
2516
2517        if let Some(range) = ranges.remove(&num) {
2518            start = start.min(*range.start());
2519            end = end.max(*range.end());
2520        }
2521
2522        ranges.insert(start, start..=end);
2523        ranges.insert(end, start..=end);
2524    }
2525
2526    async fn add_table_clusters<B: Table>(
2527        &self,
2528        table: &AsyncRwLock<B>,
2529        ranges: &mut HashMap<u64, RangeInclusive<u64>>,
2530    ) {
2531        let t = table.read().await;
2532
2533        for i in 0..t.entries() {
2534            let e = t.get(i);
2535
2536            if e.get_value() != 0 {
2537                Self::add_used_cluster_to_set(ranges, e.get_value() >> self.info.cluster_bits());
2538            }
2539        }
2540    }
2541
2542    async fn add_refcount_table_clusters(
2543        &self,
2544        ranges: &mut HashMap<u64, RangeInclusive<u64>>,
2545    ) -> Qcow2Result<()> {
2546        let info = &self.info;
2547        let rt_range = {
2548            let h = self.header.read().await;
2549
2550            h.reftable_offset()
2551                ..(h.reftable_offset() + (h.reftable_clusters() << info.cluster_bits()) as u64)
2552        };
2553
2554        for c in rt_range {
2555            Self::add_used_cluster_to_set(ranges, c >> self.info.cluster_bits());
2556        }
2557
2558        Ok(())
2559    }
2560    async fn add_l1_table_clusters(
2561        &self,
2562        ranges: &mut HashMap<u64, RangeInclusive<u64>>,
2563    ) -> Qcow2Result<()> {
2564        let info = &self.info;
2565        let cls_size = info.cluster_size();
2566        let l1_range = {
2567            let h = self.header.read().await;
2568            let l1_size = (self.l1table.read().await.byte_size() + cls_size - 1) & !(cls_size - 1);
2569
2570            h.l1_table_offset()..(h.l1_table_offset() + l1_size as u64)
2571        };
2572
2573        for c in l1_range {
2574            Self::add_used_cluster_to_set(ranges, c >> self.info.cluster_bits());
2575        }
2576
2577        Ok(())
2578    }
2579
2580    async fn add_data_clusters(
2581        &self,
2582        ranges: &mut HashMap<u64, RangeInclusive<u64>>,
2583    ) -> Qcow2Result<(usize, usize)> {
2584        let info = &self.info;
2585        let end = info.virtual_size();
2586        let mut allocated = 0;
2587        let mut compressed = 0;
2588
2589        for start in (0..end).step_by(1 << info.cluster_bits()) {
2590            let mapping = self.get_mapping(start).await?;
2591
2592            match mapping.source {
2593                MappingSource::Zero | MappingSource::Unallocated | MappingSource::Backing => {}
2594                MappingSource::DataFile => {
2595                    if let Some(off) = mapping.cluster_offset {
2596                        allocated += 1;
2597                        Self::add_used_cluster_to_set(ranges, off >> self.info.cluster_bits());
2598                    }
2599                }
2600                MappingSource::Compressed => {
2601                    if let Some(off) = mapping.cluster_offset {
2602                        let start = off >> info.cluster_bits();
2603                        let end = (off + (mapping.compressed_length.unwrap() as u64))
2604                            >> info.cluster_bits();
2605                        for off in start..=end {
2606                            Self::add_used_cluster_to_set(ranges, off);
2607                        }
2608                        allocated += 1;
2609                        compressed += 1;
2610                    }
2611                }
2612            }
2613        }
2614        Ok((allocated, compressed))
2615    }
2616
2617    fn is_allocated_cluster_in_use(set: &Vec<&RangeInclusive<u64>>, cluster: u64) -> bool {
2618        for range in set {
2619            if range.contains(&cluster) {
2620                return true;
2621            }
2622        }
2623        false
2624    }
2625
2626    /// Return Host Cluster usage, such as, allocated clusters, how many of them
2627    /// are compressed, ...
2628    pub async fn qcow2_cluster_usage<F>(&self, cls_usage: F) -> Qcow2Result<()>
2629    where
2630        F: Fn(&str, &Vec<&RangeInclusive<u64>>, Option<(usize, usize)>),
2631    {
2632        let mut set: HashMap<u64, RangeInclusive<u64>> = HashMap::new();
2633        self.add_refcount_table_clusters(&mut set).await?;
2634        let mut this_res: Vec<_> = set.values().collect();
2635        this_res.sort_by_key(|range| *range.start());
2636        this_res.dedup();
2637        cls_usage("refcount_table", &this_res, None);
2638
2639        let mut set: HashMap<u64, RangeInclusive<u64>> = HashMap::new();
2640        self.add_l1_table_clusters(&mut set).await?;
2641        let mut this_res: Vec<_> = set.values().collect();
2642        this_res.sort_by_key(|range| *range.start());
2643        this_res.dedup();
2644        cls_usage("l1_table", &this_res, None);
2645
2646        let mut set: HashMap<u64, RangeInclusive<u64>> = HashMap::new();
2647        self.add_table_clusters(&self.l1table, &mut set).await;
2648        let mut this_res: Vec<_> = set.values().collect();
2649        this_res.sort_by_key(|range| *range.start());
2650        this_res.dedup();
2651        cls_usage("l2_tables", &this_res, None);
2652
2653        let mut set: HashMap<u64, RangeInclusive<u64>> = HashMap::new();
2654        self.add_table_clusters(&self.reftable, &mut set).await;
2655        let mut this_res: Vec<_> = set.values().collect();
2656        this_res.sort_by_key(|range| *range.start());
2657        this_res.dedup();
2658        cls_usage("refblock_tables", &this_res, None);
2659
2660        let mut set: HashMap<u64, RangeInclusive<u64>> = HashMap::new();
2661        let stat_res = self.add_data_clusters(&mut set).await?;
2662        let mut this_res: Vec<_> = set.values().collect();
2663        this_res.sort_by_key(|range| *range.start());
2664        this_res.dedup();
2665        cls_usage("data", &this_res, Some(stat_res));
2666
2667        Ok(())
2668    }
2669
2670    /// check if any cluster is leaked
2671    async fn check_cluster_leak(&self) -> Qcow2Result<bool> {
2672        let info = &self.info;
2673        let mut set: HashMap<u64, RangeInclusive<u64>> = HashMap::new();
2674        let mut res = false;
2675
2676        //add header cluster into set
2677        Self::add_used_cluster_to_set(&mut set, 0);
2678
2679        self.add_refcount_table_clusters(&mut set).await?;
2680        self.add_l1_table_clusters(&mut set).await?;
2681        self.add_table_clusters(&self.l1table, &mut set).await;
2682        self.add_table_clusters(&self.reftable, &mut set).await;
2683        let _ = self.add_data_clusters(&mut set).await?;
2684
2685        let mut result: Vec<_> = set.values().collect();
2686        result.sort_by_key(|range| *range.start());
2687        result.dedup();
2688
2689        for range in &result {
2690            log::debug!("{:?}", range);
2691        }
2692
2693        let max_allocated: u64 = {
2694            let rt = self.reftable.read().await;
2695            let mut idx = 0;
2696
2697            while idx < rt.entries() {
2698                if rt.get(idx).is_zero() {
2699                    break;
2700                }
2701                idx += 1;
2702            }
2703
2704            ((idx + 1) as u64) << ((info.rb_index_shift as usize) + info.cluster_bits())
2705        };
2706
2707        log::debug!(
2708            "start leak check: virt size {:x} max_allocted {:x}",
2709            info.virtual_size(),
2710            max_allocated
2711        );
2712        for start in (0..max_allocated).step_by(1 << info.cluster_bits()) {
2713            let allocated = self.cluster_is_allocated(start).await?;
2714            if !allocated {
2715                continue;
2716            }
2717
2718            if !Self::is_allocated_cluster_in_use(&result, start >> info.cluster_bits()) {
2719                eprintln!(
2720                    "cluster {:x}/{} is leaked",
2721                    start,
2722                    start >> info.cluster_bits()
2723                );
2724                res = true;
2725            }
2726        }
2727
2728        Ok(res)
2729    }
2730
2731    async fn cluster_is_allocated(&self, host_cluster: u64) -> Qcow2Result<bool> {
2732        let cls = HostCluster(host_cluster);
2733        let rt_entry = {
2734            let rt_index = cls.rt_index(&self.info);
2735            let reftable = self.reftable.read().await;
2736            reftable.get(rt_index)
2737        };
2738
2739        if rt_entry.is_zero() {
2740            return Ok(false);
2741        }
2742
2743        let rb_handle = self.get_refblock(&cls, &rt_entry).await?;
2744        let rb = rb_handle.value().read().await;
2745
2746        if rb.get(cls.rb_slice_index(&self.info)).is_zero() {
2747            Ok(false)
2748        } else {
2749            Ok(true)
2750        }
2751    }
2752
2753    async fn check_cluster(&self, virt_off: u64, cluster: Option<u64>) -> Qcow2Result<()> {
2754        match cluster {
2755            None => Ok(()),
2756            Some(host_cluster) => {
2757                match self.cluster_is_allocated(host_cluster).await? {
2758                    false => {
2759                        eprintln!(
2760                            "virt_offset {:x} pointed to non-allocated cluster {:x}",
2761                            virt_off, host_cluster
2762                        );
2763                    }
2764                    true => {}
2765                }
2766                Ok(())
2767            }
2768        }
2769    }
2770
2771    /// no need to check backing & compressed, which is readonly
2772    async fn check_single_mapping(&self, off: u64, mapping: Mapping) -> Qcow2Result<()> {
2773        match mapping.source {
2774            MappingSource::Zero
2775            | MappingSource::Unallocated
2776            | MappingSource::Backing
2777            | MappingSource::Compressed => Ok(()),
2778            MappingSource::DataFile => self.check_cluster(off, mapping.cluster_offset).await,
2779        }
2780    }
2781
2782    /// check if every cluster pointed by mapping is valid
2783    async fn check_mapping(&self) -> Qcow2Result<()> {
2784        let info = &self.info;
2785        let end = info.virtual_size();
2786
2787        for start in (0..end).step_by(1 << info.cluster_bits()) {
2788            let mapping = self.get_mapping(start).await?;
2789
2790            self.check_single_mapping(start, mapping).await?;
2791        }
2792        Ok(())
2793    }
2794
2795    /// Check Qcow2 meta integrity and cluster leak
2796    pub async fn check(&self) -> Qcow2Result<()> {
2797        self.check_mapping().await?;
2798
2799        if self.check_cluster_leak().await? {
2800            return Err("check: cluster leak".into());
2801        }
2802        Ok(())
2803    }
2804
2805    pub(crate) async fn __qcow2_prep_io(&self) -> Qcow2Result<()> {
2806        self.load_l1_table().await?;
2807        self.load_refcount_table().await?;
2808
2809        Ok(())
2810    }
2811
2812    /// Prepare everything(loading l1/refcount table) for handling any qcow2 IO
2813    #[async_recursion(?Send)]
2814    pub async fn qcow2_prep_io(&self) -> Qcow2Result<()> {
2815        match &self.backing_file {
2816            Some(back) => back.qcow2_prep_io().await?,
2817            None => {}
2818        };
2819        self.__qcow2_prep_io().await
2820    }
2821}
2822
2823#[cfg(test)]
2824mod tests {
2825    use crate::dev::*;
2826    use crate::helpers::Qcow2IoBuf;
2827    use crate::qcow2_default_params;
2828    use crate::tokio_io::Qcow2IoTokio;
2829    use crate::utils::{make_temp_qcow2_img, qcow2_setup_dev_tokio};
2830    use std::path::PathBuf;
2831    use tokio::runtime::Runtime;
2832
2833    #[test]
2834    fn test_qcow2_dev_allocater_small() {
2835        let rt = Runtime::new().unwrap();
2836        rt.block_on(async move {
2837            let to_alloc = 180;
2838            let size = 64_u64 << 20;
2839            let cluster_bits = 16;
2840            let img_file = make_temp_qcow2_img(size, cluster_bits, 4);
2841            let path = PathBuf::from(img_file.path());
2842            let params = qcow2_default_params!(false, false);
2843
2844            let dev = qcow2_setup_dev_tokio(&path, &params).await.unwrap();
2845            let allocated = dev.count_alloc_clusters().await.unwrap();
2846            assert!(allocated >= 3);
2847
2848            let res = dev.allocate_clusters(to_alloc).await.unwrap().unwrap();
2849            dev.flush_meta().await.unwrap();
2850
2851            let dev2 = qcow2_setup_dev_tokio(&path, &params).await.unwrap();
2852            let curr = dev2.count_alloc_clusters().await.unwrap();
2853
2854            // count and see if anything is expected
2855            assert!(allocated + res.1 == curr);
2856
2857            // free last allocation
2858            dev.free_clusters(res.0, res.1).await.unwrap();
2859            dev.flush_meta().await.unwrap();
2860
2861            let dev3 = qcow2_setup_dev_tokio(&path, &params).await.unwrap();
2862            let curr = dev3.count_alloc_clusters().await.unwrap();
2863
2864            //check if the last free is done successfully
2865            assert!(allocated == curr);
2866        });
2867    }
2868
2869    #[test]
2870    fn test_qcow2_dev_allocater_big() {
2871        let rt = Runtime::new().unwrap();
2872        rt.block_on(async move {
2873            let size = 256_u64 << 20;
2874            let cluster_bits = 16;
2875            let img_file = make_temp_qcow2_img(size, cluster_bits, 4);
2876            let path = PathBuf::from(img_file.path());
2877            let params = qcow2_default_params!(false, false);
2878
2879            let dev = qcow2_setup_dev_tokio(&path, &params).await.unwrap();
2880            let allocated = dev.count_alloc_clusters().await.unwrap();
2881            assert!(allocated >= 3);
2882
2883            let to_alloc = dev.info.rb_entries();
2884
2885            let res = dev.allocate_clusters(to_alloc).await.unwrap().unwrap();
2886            dev.flush_meta().await.unwrap();
2887
2888            let dev2 = qcow2_setup_dev_tokio(&path, &params).await.unwrap();
2889            let curr = dev2.count_alloc_clusters().await.unwrap();
2890
2891            /*
2892            println!(
2893                "original {} allocated {}/{} now_allocated {}",
2894                allocated, to_alloc, res.1, curr
2895            );
2896            */
2897
2898            // count and see if anything is expected
2899            assert!(allocated + res.1 == curr);
2900
2901            // free last allocation
2902            dev.free_clusters(res.0, res.1).await.unwrap();
2903            if dev.need_flush_meta() {
2904                dev.flush_meta().await.unwrap();
2905            }
2906
2907            let dev3 = qcow2_setup_dev_tokio(&path, &params).await.unwrap();
2908            let curr = dev3.count_alloc_clusters().await.unwrap();
2909
2910            //check if the last free is done successfully
2911            assert!(allocated == curr);
2912        });
2913    }
2914
2915    #[test]
2916    fn test_qcow2_dev_allocater_single() {
2917        let rt = Runtime::new().unwrap();
2918        rt.block_on(async move {
2919            let size = 256_u64 << 20;
2920            let cluster_bits = 16;
2921            let img_file = make_temp_qcow2_img(size, cluster_bits, 4);
2922            let path = PathBuf::from(img_file.path());
2923            let params = qcow2_default_params!(false, false);
2924
2925            let dev = qcow2_setup_dev_tokio(&path, &params).await.unwrap();
2926            let allocated = dev.count_alloc_clusters().await.unwrap();
2927            assert!(allocated >= 3);
2928
2929            let to_alloc = dev.info.rb_entries();
2930            for _ in 0..to_alloc {
2931                let res = dev.allocate_clusters(1).await.unwrap().unwrap();
2932
2933                assert!(res.1 == 1);
2934            }
2935            if dev.need_flush_meta() {
2936                dev.flush_meta().await.unwrap();
2937            }
2938
2939            let dev2 = qcow2_setup_dev_tokio(&path, &params).await.unwrap();
2940            let curr = dev2.count_alloc_clusters().await.unwrap();
2941
2942            // 1 extra cluster is for refblock
2943            assert!(allocated + to_alloc + 1 == curr);
2944        });
2945    }
2946
2947    #[test]
2948    fn test_qcow2_dev_io() {
2949        let rt = Runtime::new().unwrap();
2950        rt.block_on(async move {
2951            let size = 64_u64 << 20;
2952            let img_file = make_temp_qcow2_img(size, 16, 4);
2953            let io = Qcow2IoTokio::new(&img_file.path().to_path_buf(), true, false).await;
2954            let mut buf = Qcow2IoBuf::<u8>::new(4096);
2955            let _ = io.read_to(0, &mut buf).await;
2956            let header = Qcow2Header::from_buf(&buf).unwrap();
2957
2958            assert!(header.size() == size);
2959            assert!(header.cluster_bits() == 16);
2960        });
2961    }
2962}