1use async_trait::async_trait;
2use futures::join;
3use off64::int::Off64ReadInt;
4use parking_lot::Mutex;
5use seekable_async_file::SeekableAsyncFile;
6use signal_future::SignalFuture;
7use signal_future::SignalFutureController;
8use std::collections::BTreeMap;
9use std::sync::atomic::AtomicU64;
10use std::sync::atomic::Ordering;
11use std::sync::Arc;
12use tokio::time::sleep;
13use write_journal::WriteJournal;
14
15const STATE_OFFSETOF_HEAD: u64 = 0;
16const STATE_OFFSETOF_TAIL: u64 = STATE_OFFSETOF_HEAD + 8;
17pub const STATE_SIZE: u64 = STATE_OFFSETOF_TAIL + 8;
18
19#[derive(Default)]
20struct LogState {
21 head: u64,
22 tail: u64,
23 pending_tail_bumps: BTreeMap<u64, Option<SignalFutureController>>,
25 tail_on_disk: u64,
27}
28
29#[derive(Clone, Copy)]
30pub struct TailBump {
31 pub acquired_physical_offset: u64,
32 pub uncommitted_virtual_offset: u64,
33}
34
35pub enum GarbageCheck {
36 IsGarbage(u64),
37 IsNotGarbage,
38 IsPadding,
39}
40
41#[async_trait]
42pub trait GarbageChecker {
43 async fn check_offset(&self, offset: u64) -> GarbageCheck;
44}
45
46pub struct LogStructured<GC: GarbageChecker> {
47 device_offset: u64,
48 device_size: u64,
49 device: SeekableAsyncFile,
50 free_space_gauge: Arc<AtomicU64>,
51 garbage_checker: GC,
52 journal: Arc<WriteJournal>,
53 log_state: Mutex<LogState>,
54 padding_indicator: Vec<u8>,
55}
56
57impl<GC: GarbageChecker> LogStructured<GC> {
58 pub fn new(
59 device: SeekableAsyncFile,
60 device_offset: u64,
61 device_size: u64,
62 journal: Arc<WriteJournal>,
63 garbage_checker: GC,
64 padding_indicator: Vec<u8>,
65 free_space_gauge: Arc<AtomicU64>,
66 ) -> Self {
67 Self {
68 device_offset,
69 device_size,
70 device,
71 free_space_gauge,
72 garbage_checker,
73 journal,
74 log_state: Mutex::new(LogState::default()),
75 padding_indicator,
76 }
77 }
78
79 fn reserved_size(&self) -> u64 {
80 self.device_offset + STATE_SIZE
81 }
82
83 pub fn physical_offset(&self, virtual_offset: u64) -> u64 {
84 self.reserved_size() + (virtual_offset % (self.device_size - self.reserved_size()))
85 }
86
87 pub async fn bump_tail(&self, usage: usize) -> TailBump {
89 let usage: u64 = usage.try_into().unwrap();
90 assert!(usage > 0);
91 if usage > self.device_size - self.device_offset {
92 panic!("out of storage space");
93 };
94
95 let (physical_offset, new_tail, write_filler_at) = {
96 let mut state = self.log_state.lock();
97 let mut physical_offset = self.physical_offset(state.tail);
98 let mut write_filler_at = None;
99 if physical_offset + usage >= self.device_size {
100 write_filler_at = Some(physical_offset);
102 let filler = self.device_size - physical_offset;
103 physical_offset = self.reserved_size();
104 state.tail += filler;
105 };
106
107 state.tail += usage;
108 let new_tail = state.tail;
109 if new_tail - state.head > self.device_size - self.reserved_size() {
110 panic!("out of storage space");
111 };
112
113 let None = state.pending_tail_bumps.insert(new_tail, None) else {
114 unreachable!();
115 };
116 self
117 .free_space_gauge
118 .store(state.tail - state.head, Ordering::Relaxed);
119 (physical_offset, new_tail, write_filler_at)
120 };
121
122 if let Some(write_filler_at) = write_filler_at {
123 self
125 .device
126 .write_at(write_filler_at, self.padding_indicator.clone())
127 .await;
128 };
129
130 TailBump {
131 acquired_physical_offset: physical_offset,
132 uncommitted_virtual_offset: new_tail,
133 }
134 }
135
136 pub async fn commit_tail_bump(&self, bump: TailBump) {
137 let (fut, fut_ctl) = SignalFuture::new();
138
139 {
140 let mut state = self.log_state.lock();
141
142 *state
143 .pending_tail_bumps
144 .get_mut(&bump.uncommitted_virtual_offset)
145 .unwrap() = Some(fut_ctl.clone());
146 };
147
148 fut.await;
149 }
150
151 async fn start_background_garbage_collection_loop(&self) {
152 loop {
153 sleep(std::time::Duration::from_secs(10)).await;
154
155 let (orig_head, tail) = {
162 let state = self.log_state.lock();
163 (state.head, state.tail_on_disk)
164 };
165 let mut head = orig_head;
166 while head < tail {
167 let physical_offset = self.physical_offset(head);
168 let res = self.garbage_checker.check_offset(physical_offset).await;
169 match res {
170 GarbageCheck::IsGarbage(ent_size) => {
171 head += ent_size;
172 }
173 GarbageCheck::IsNotGarbage => {
174 break;
175 }
176 GarbageCheck::IsPadding => {
177 head += self.device_size - physical_offset;
178 }
179 };
180 }
181 if head != orig_head {
182 let mut txn = self.journal.begin_transaction();
183 txn.write(self.device_offset + STATE_OFFSETOF_HEAD, head.to_be_bytes());
184 self.journal.commit_transaction(txn).await;
185 let mut state = self.log_state.lock();
186 state.head = head;
187 self
188 .free_space_gauge
189 .store(state.tail - state.head, Ordering::Relaxed);
190 };
191 }
192 }
193
194 async fn start_background_tail_bump_commit_loop(&self) {
195 loop {
196 sleep(std::time::Duration::from_micros(200)).await;
197
198 let mut to_resolve = vec![];
199 let mut new_tail_to_write = None;
200 {
201 let mut state = self.log_state.lock();
202 loop {
203 let Some(e) = state.pending_tail_bumps.first_entry() else {
204 break;
205 };
206 if e.get().is_none() {
207 break;
208 };
209 let (k, fut_state) = e.remove_entry();
210 to_resolve.push(fut_state.unwrap());
211 new_tail_to_write = Some(k);
212 }
213 if let Some(tail) = new_tail_to_write {
214 state.tail_on_disk = tail;
215 };
216 };
217
218 if let Some(new_tail_to_write) = new_tail_to_write {
219 let mut txn = self.journal.begin_transaction();
220 txn.write(
221 self.device_offset + STATE_OFFSETOF_TAIL,
222 new_tail_to_write.to_be_bytes(),
223 );
224 self.journal.commit_transaction(txn).await;
225
226 for ft in to_resolve {
227 ft.signal(());
228 }
229 };
230 }
231 }
232
233 pub async fn start_background_loops(&self) {
234 join! {
235 self.start_background_garbage_collection_loop(),
236 self.start_background_tail_bump_commit_loop(),
237 };
238 }
239
240 pub async fn format_device(&self) {
241 self
242 .device
243 .write_at(
244 self.device_offset + STATE_OFFSETOF_HEAD,
245 0u64.to_be_bytes().to_vec(),
246 )
247 .await;
248 self
249 .device
250 .write_at(
251 self.device_offset + STATE_OFFSETOF_TAIL,
252 0u64.to_be_bytes().to_vec(),
253 )
254 .await;
255 }
256
257 pub fn get_head_and_tail(&self) -> (u64, u64) {
258 let log_state = self.log_state.lock();
259 (log_state.head, log_state.tail)
260 }
261
262 pub async fn load_state_from_device(&self) {
263 let head = self
264 .device
265 .read_at(self.device_offset + STATE_OFFSETOF_HEAD, 8)
266 .await
267 .read_u64_be_at(0);
268 let tail = self
269 .device
270 .read_at(self.device_offset + STATE_OFFSETOF_TAIL, 8)
271 .await
272 .read_u64_be_at(0);
273 self.free_space_gauge.store(tail - head, Ordering::Relaxed);
274 {
275 let mut log_state = self.log_state.lock();
276 log_state.head = head;
277 log_state.tail = tail;
278 };
279 }
280}