1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
use crate::backing_store::PartitionStore;
use crate::ctx::Ctx;
use crate::objects::format_device_for_objects;
use crate::objects::load_objects_from_device;
use crate::objects::LoadedObjects;
use crate::pages::Pages;
use crate::util::ceil_pow2;
use crate::util::floor_pow2;
use crate::BlobdCfg;
use cadence::StatsdClient;
use parking_lot::Mutex;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use tokio::spawn;
use tracing::info;

pub(crate) struct PartitionLoader {
  dev: PartitionStore,
  pages: Pages,
  statsd: Option<Arc<StatsdClient>>,
  partition_idx: usize,
  heap_dev_offset: u64,
  heap_size: u64,
}

impl PartitionLoader {
  pub fn new(
    partition_idx: usize,
    partition_store: PartitionStore,
    cfg: BlobdCfg,
    pages: Pages,
  ) -> Self {
    let tuples_area_size = ceil_pow2(cfg.object_tuples_area_reserved_space, cfg.lpage_size_pow2);
    let heap_dev_offset = tuples_area_size;
    let heap_end = floor_pow2(partition_store.len(), pages.lpage_size_pow2);
    let heap_size = heap_end - heap_dev_offset;
    assert!(tuples_area_size + heap_dev_offset <= heap_end);

    info!(
      partition_number = partition_idx,
      partition_offset = partition_store.offset(),
      partition_size = partition_store.len(),
      heap_dev_offset = heap_dev_offset,
      heap_size,
      "init partition",
    );

    Self {
      dev: partition_store,
      heap_dev_offset,
      heap_size,
      pages,
      partition_idx,
      statsd: cfg.statsd.clone(),
    }
  }

  pub async fn format(&self) {
    format_device_for_objects(self.dev.bounded(0, self.heap_dev_offset), &self.pages).await;
    self.dev.sync().await;
  }

  pub async fn load_and_start(self) -> Partition {
    let dev = &self.dev;
    let pages = &self.pages;

    let LoadedObjects {
      committed_objects,
      heap_allocator,
      incomplete_objects,
      next_object_id,
      tuples,
    } = load_objects_from_device(
      dev.clone(),
      pages.clone(),
      self.statsd.clone(),
      self.heap_dev_offset,
      self.heap_size,
    )
    .await;

    let ctx = Arc::new(Ctx {
      committed_objects,
      device: self.dev.clone(),
      heap_allocator: Mutex::new(heap_allocator),
      incomplete_objects,
      next_object_id: AtomicU64::new(next_object_id),
      pages: pages.clone(),
      partition_idx: self.partition_idx,
      statsd: self.statsd,
      tuples: tuples.clone(),
    });

    spawn({
      let dev = dev.bounded(0, self.heap_dev_offset);
      let pages = pages.clone();
      async move { tuples.start_background_commit_loop(dev, pages).await }
    });

    Partition { ctx }
  }
}

pub(crate) struct Partition {
  pub ctx: Arc<Ctx>,
}