1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
#[cfg(test)]
pub mod tests;

use crate::metrics::METRIC_ALLOCATED_BYTES;
use crate::pages::Pages;
use crate::util::div_pow2;
use crate::util::mod_pow2;
use crate::util::mul_pow2;
use cadence::Counted;
use cadence::StatsdClient;
use off64::u32;
use off64::u8;
use off64::usz;
use roaring::RoaringBitmap;
use std::cmp::max;
use std::error::Error;
use std::fmt;
use std::fmt::Display;
use std::sync::Arc;
use struct_name::StructName;
use struct_name_macro::StructName;

#[derive(PartialEq, Eq, Clone, Copy, Debug, StructName)]
pub(crate) struct OutOfSpaceError;

impl Display for OutOfSpaceError {
  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
    f.write_str(Self::struct_name())
  }
}

impl Error for OutOfSpaceError {}

#[derive(PartialEq, Eq, Clone, Copy)]
pub(crate) enum AllocDir {
  Left,
  Right,
}

type PageNum = u32;

pub(crate) struct Allocator {
  base_dev_offset: u64,
  dir: AllocDir,
  // One for each page size.
  free: Vec<RoaringBitmap>,
  pages: Pages,
  statsd: Option<Arc<StatsdClient>>,
}

impl Allocator {
  pub fn new(
    heap_dev_offset: u64,
    heap_size: u64,
    pages: Pages,
    dir: AllocDir,
    statsd: Option<Arc<StatsdClient>>,
  ) -> Self {
    assert_eq!(mod_pow2(heap_dev_offset, pages.lpage_size_pow2), 0);
    assert_eq!(mod_pow2(heap_size, pages.lpage_size_pow2), 0);
    Self {
      base_dev_offset: heap_dev_offset,
      dir,
      free: (pages.spage_size_pow2..=pages.lpage_size_pow2)
        .map(|sz| {
          let page_count = u32!(div_pow2(heap_size, sz));
          let mut map = RoaringBitmap::new();
          if sz == pages.lpage_size_pow2 {
            map.insert_range(..page_count);
          };
          map
        })
        .collect(),
      pages,
      statsd,
    }
  }

  fn bitmap(&self, page_size_pow2: u8) -> &RoaringBitmap {
    &self.free[usz!(page_size_pow2 - self.pages.spage_size_pow2)]
  }

  fn bitmap_mut(&mut self, page_size_pow2: u8) -> &mut RoaringBitmap {
    &mut self.free[usz!(page_size_pow2 - self.pages.spage_size_pow2)]
  }

  fn to_page_num(&self, page_dev_offset: u64, page_size_pow2: u8) -> PageNum {
    u32!(div_pow2(
      page_dev_offset - self.base_dev_offset,
      page_size_pow2
    ))
  }

  fn to_page_dev_offset(&self, page_num: u32, page_size_pow2: u8) -> u64 {
    mul_pow2(page_num.into(), page_size_pow2) + self.base_dev_offset
  }

  fn _mark_as_allocated(&mut self, page_num: PageNum, page_size_pow2: u8) {
    assert!(
      page_size_pow2 >= self.pages.spage_size_pow2 && page_size_pow2 <= self.pages.lpage_size_pow2
    );
    if !self.bitmap_mut(page_size_pow2).remove(page_num) {
      // We need to split parent.
      self._mark_as_allocated(page_num / 2, page_size_pow2 + 1);
      self.bitmap_mut(page_size_pow2).insert(page_num ^ 1);
    };
  }

  pub fn mark_as_allocated(&mut self, page_dev_offset: u64, page_size_pow2: u8) {
    assert_eq!(mod_pow2(page_dev_offset, page_size_pow2), 0);

    self._mark_as_allocated(
      self.to_page_num(page_dev_offset, page_size_pow2),
      page_size_pow2,
    );

    self.statsd.as_ref().map(|s| {
      s.count(METRIC_ALLOCATED_BYTES, 1 << page_size_pow2)
        .unwrap()
    });
  }

  /// Returns the page number.
  fn _allocate(&mut self, page_size_pow2: u8) -> Result<PageNum, OutOfSpaceError> {
    assert!(
      page_size_pow2 >= self.pages.spage_size_pow2 && page_size_pow2 <= self.pages.lpage_size_pow2
    );
    match if self.dir == AllocDir::Left {
      self.bitmap(page_size_pow2).min()
    } else {
      self.bitmap(page_size_pow2).max()
    } {
      Some(page_num) => {
        assert!(self.bitmap_mut(page_size_pow2).remove(page_num));
        Ok(page_num)
      }
      None if page_size_pow2 == self.pages.lpage_size_pow2 => Err(OutOfSpaceError),
      None => {
        // Find or create a larger page.
        let larger_page_num = self._allocate(page_size_pow2 + 1)?;
        // Split the larger page in two, and release left/right page depending on allocation direction (we'll take the other one).
        Ok(match self.dir {
          // We'll take the left page, so free the right one.
          AllocDir::Left => {
            assert!(self
              .bitmap_mut(page_size_pow2)
              .insert(larger_page_num * 2 + 1));
            larger_page_num * 2
          }
          // We'll take the right page, so free the left one.
          AllocDir::Right => {
            assert!(self.bitmap_mut(page_size_pow2).insert(larger_page_num * 2));
            larger_page_num * 2 + 1
          }
        })
      }
    }
  }

  pub fn allocate(&mut self, size: u64) -> Result<u64, OutOfSpaceError> {
    let pow2 = max(
      self.pages.spage_size_pow2,
      u8!(size.next_power_of_two().ilog2()),
    );
    assert!(pow2 <= self.pages.lpage_size_pow2);
    // We increment these metrics here instead of in `Pages::*`, `Allocator::insert_into_free_list`, `Allocator::allocate_page`, etc. as many of those are called during intermediate states, like merging/splitting pages, which aren't actual allocations.
    self
      .statsd
      .as_ref()
      .map(|s| s.count(METRIC_ALLOCATED_BYTES, 1 << pow2).unwrap());
    let page_num = self._allocate(pow2)?;
    Ok(self.to_page_dev_offset(page_num, pow2))
  }

  fn _release(&mut self, page_num: PageNum, page_size_pow2: u8) {
    // Check if buddy is also free so we can recompact. This doesn't apply to lpages as they don't have buddies (they aren't split).
    if page_size_pow2 < self.pages.lpage_size_pow2 {
      let buddy_page_num = page_num ^ 1;
      if self.bitmap(page_size_pow2).contains(buddy_page_num) {
        // Buddy is also free.
        assert!(self.bitmap_mut(page_size_pow2).remove(buddy_page_num));
        assert!(!self.bitmap(page_size_pow2).contains(page_num));
        // Merge by freeing parent larger page.
        let parent_page_num = page_num / 2;
        self._release(parent_page_num, page_size_pow2 + 1);
        return;
      };
    };
    assert!(self.bitmap_mut(page_size_pow2).insert(page_num));
  }

  pub fn release(&mut self, page_dev_offset: u64, page_size_pow2: u8) {
    let page_num = self.to_page_num(page_dev_offset, page_size_pow2);
    // Similar to `allocate`, we need to change metrics here and use an internal function. See `allocate` for comment explaining why.
    self.statsd.as_ref().map(|s| {
      s.count(METRIC_ALLOCATED_BYTES, -(1 << page_size_pow2))
        .unwrap()
    });
    self._release(page_num, page_size_pow2);
  }
}