log_structured/
lib.rs

1use async_trait::async_trait;
2use futures::join;
3use off64::int::Off64ReadInt;
4use parking_lot::Mutex;
5use seekable_async_file::SeekableAsyncFile;
6use signal_future::SignalFuture;
7use signal_future::SignalFutureController;
8use std::collections::BTreeMap;
9use std::sync::atomic::AtomicU64;
10use std::sync::atomic::Ordering;
11use std::sync::Arc;
12use tokio::time::sleep;
13use write_journal::WriteJournal;
14
15const STATE_OFFSETOF_HEAD: u64 = 0;
16const STATE_OFFSETOF_TAIL: u64 = STATE_OFFSETOF_HEAD + 8;
17pub const STATE_SIZE: u64 = STATE_OFFSETOF_TAIL + 8;
18
19#[derive(Default)]
20struct LogState {
21  head: u64,
22  tail: u64,
23  // This is to prevent the scenario where a write at a later offset (i.e. subsequent request B) finishes before a write at an earlier offset (i.e. earlier request A); we can't immediately update the tail on disk after writing B because it would include A, which hasn't been synced yet.
24  pending_tail_bumps: BTreeMap<u64, Option<SignalFutureController>>,
25  // Necessary for GC to know where to safely read up to. `tail` may point past pending/partially-written data.
26  tail_on_disk: u64,
27}
28
29#[derive(Clone, Copy)]
30pub struct TailBump {
31  pub acquired_physical_offset: u64,
32  pub uncommitted_virtual_offset: u64,
33}
34
35pub enum GarbageCheck {
36  IsGarbage(u64),
37  IsNotGarbage,
38  IsPadding,
39}
40
41#[async_trait]
42pub trait GarbageChecker {
43  async fn check_offset(&self, offset: u64) -> GarbageCheck;
44}
45
46pub struct LogStructured<GC: GarbageChecker> {
47  device_offset: u64,
48  device_size: u64,
49  device: SeekableAsyncFile,
50  free_space_gauge: Arc<AtomicU64>,
51  garbage_checker: GC,
52  journal: Arc<WriteJournal>,
53  log_state: Mutex<LogState>,
54  padding_indicator: Vec<u8>,
55}
56
57impl<GC: GarbageChecker> LogStructured<GC> {
58  pub fn new(
59    device: SeekableAsyncFile,
60    device_offset: u64,
61    device_size: u64,
62    journal: Arc<WriteJournal>,
63    garbage_checker: GC,
64    padding_indicator: Vec<u8>,
65    free_space_gauge: Arc<AtomicU64>,
66  ) -> Self {
67    Self {
68      device_offset,
69      device_size,
70      device,
71      free_space_gauge,
72      garbage_checker,
73      journal,
74      log_state: Mutex::new(LogState::default()),
75      padding_indicator,
76    }
77  }
78
79  fn reserved_size(&self) -> u64 {
80    self.device_offset + STATE_SIZE
81  }
82
83  pub fn physical_offset(&self, virtual_offset: u64) -> u64 {
84    self.reserved_size() + (virtual_offset % (self.device_size - self.reserved_size()))
85  }
86
87  // How to use: bump tail, perform the write to the acquired tail offset, then persist the bumped tail. If tail is committed before write is persisted, it'll point to invalid data if the write didn't complete.
88  pub async fn bump_tail(&self, usage: usize) -> TailBump {
89    let usage: u64 = usage.try_into().unwrap();
90    assert!(usage > 0);
91    if usage > self.device_size - self.device_offset {
92      panic!("out of storage space");
93    };
94
95    let (physical_offset, new_tail, write_filler_at) = {
96      let mut state = self.log_state.lock();
97      let mut physical_offset = self.physical_offset(state.tail);
98      let mut write_filler_at = None;
99      if physical_offset + usage >= self.device_size {
100        // Write after releasing lock (performance) and checking tail >= head (safety).
101        write_filler_at = Some(physical_offset);
102        let filler = self.device_size - physical_offset;
103        physical_offset = self.reserved_size();
104        state.tail += filler;
105      };
106
107      state.tail += usage;
108      let new_tail = state.tail;
109      if new_tail - state.head > self.device_size - self.reserved_size() {
110        panic!("out of storage space");
111      };
112
113      let None = state.pending_tail_bumps.insert(new_tail, None) else {
114        unreachable!();
115      };
116      self
117        .free_space_gauge
118        .store(state.tail - state.head, Ordering::Relaxed);
119      (physical_offset, new_tail, write_filler_at)
120    };
121
122    if let Some(write_filler_at) = write_filler_at {
123      // TODO Prove safety.
124      self
125        .device
126        .write_at(write_filler_at, self.padding_indicator.clone())
127        .await;
128    };
129
130    TailBump {
131      acquired_physical_offset: physical_offset,
132      uncommitted_virtual_offset: new_tail,
133    }
134  }
135
136  pub async fn commit_tail_bump(&self, bump: TailBump) {
137    let (fut, fut_ctl) = SignalFuture::new();
138
139    {
140      let mut state = self.log_state.lock();
141
142      *state
143        .pending_tail_bumps
144        .get_mut(&bump.uncommitted_virtual_offset)
145        .unwrap() = Some(fut_ctl.clone());
146    };
147
148    fut.await;
149  }
150
151  async fn start_background_garbage_collection_loop(&self) {
152    loop {
153      sleep(std::time::Duration::from_secs(10)).await;
154
155      // SAFETY:
156      // - `head` is only ever modified by us so it's always what we expect.
157      // - `tail` can be modified by others at any time but it only ever increases. If it physically reaches `head` (i.e. out of space), we panic.
158      // - Data is never erased; only we can move the `head` to mark areas as free again but even then no data is written/cleared.
159      // - Written log entries are never mutated/written to again, so we don't have to worry about other writers.
160      // - Therefore, it's always safe to read from `head` to `tail`.
161      let (orig_head, tail) = {
162        let state = self.log_state.lock();
163        (state.head, state.tail_on_disk)
164      };
165      let mut head = orig_head;
166      while head < tail {
167        let physical_offset = self.physical_offset(head);
168        let res = self.garbage_checker.check_offset(physical_offset).await;
169        match res {
170          GarbageCheck::IsGarbage(ent_size) => {
171            head += ent_size;
172          }
173          GarbageCheck::IsNotGarbage => {
174            break;
175          }
176          GarbageCheck::IsPadding => {
177            head += self.device_size - physical_offset;
178          }
179        };
180      }
181      if head != orig_head {
182        let mut txn = self.journal.begin_transaction();
183        txn.write(self.device_offset + STATE_OFFSETOF_HEAD, head.to_be_bytes());
184        self.journal.commit_transaction(txn).await;
185        let mut state = self.log_state.lock();
186        state.head = head;
187        self
188          .free_space_gauge
189          .store(state.tail - state.head, Ordering::Relaxed);
190      };
191    }
192  }
193
194  async fn start_background_tail_bump_commit_loop(&self) {
195    loop {
196      sleep(std::time::Duration::from_micros(200)).await;
197
198      let mut to_resolve = vec![];
199      let mut new_tail_to_write = None;
200      {
201        let mut state = self.log_state.lock();
202        loop {
203          let Some(e) = state.pending_tail_bumps.first_entry() else {
204            break;
205          };
206          if e.get().is_none() {
207            break;
208          };
209          let (k, fut_state) = e.remove_entry();
210          to_resolve.push(fut_state.unwrap());
211          new_tail_to_write = Some(k);
212        }
213        if let Some(tail) = new_tail_to_write {
214          state.tail_on_disk = tail;
215        };
216      };
217
218      if let Some(new_tail_to_write) = new_tail_to_write {
219        let mut txn = self.journal.begin_transaction();
220        txn.write(
221          self.device_offset + STATE_OFFSETOF_TAIL,
222          new_tail_to_write.to_be_bytes(),
223        );
224        self.journal.commit_transaction(txn).await;
225
226        for ft in to_resolve {
227          ft.signal(());
228        }
229      };
230    }
231  }
232
233  pub async fn start_background_loops(&self) {
234    join! {
235      self.start_background_garbage_collection_loop(),
236      self.start_background_tail_bump_commit_loop(),
237    };
238  }
239
240  pub async fn format_device(&self) {
241    self
242      .device
243      .write_at(
244        self.device_offset + STATE_OFFSETOF_HEAD,
245        0u64.to_be_bytes().to_vec(),
246      )
247      .await;
248    self
249      .device
250      .write_at(
251        self.device_offset + STATE_OFFSETOF_TAIL,
252        0u64.to_be_bytes().to_vec(),
253      )
254      .await;
255  }
256
257  pub fn get_head_and_tail(&self) -> (u64, u64) {
258    let log_state = self.log_state.lock();
259    (log_state.head, log_state.tail)
260  }
261
262  pub async fn load_state_from_device(&self) {
263    let head = self
264      .device
265      .read_at(self.device_offset + STATE_OFFSETOF_HEAD, 8)
266      .await
267      .read_u64_be_at(0);
268    let tail = self
269      .device
270      .read_at(self.device_offset + STATE_OFFSETOF_TAIL, 8)
271      .await
272      .read_u64_be_at(0);
273    self.free_space_gauge.store(tail - head, Ordering::Relaxed);
274    {
275      let mut log_state = self.log_state.lock();
276      log_state.head = head;
277      log_state.tail = tail;
278    };
279  }
280}