1use crate::ctx::State;
2use crate::object::OBJECT_KEY_LEN_MAX;
3use crate::object::OBJECT_OFF;
4use crate::page::ObjectPageHeader;
5use crate::page::ObjectState;
6use crate::page::Pages;
7use crate::page::MIN_PAGE_SIZE_POW2;
8use crate::stream::CreatedStreamEvent;
9use crate::stream::StreamEvent;
10use crate::stream::StreamEventType;
11#[cfg(test)]
12use crate::test_util::device::TestSeekableAsyncFile as SeekableAsyncFile;
13#[cfg(test)]
14use crate::test_util::journal::TestTransaction as Transaction;
15#[cfg(test)]
16use crate::test_util::journal::TestWriteJournal as WriteJournal;
17use itertools::Itertools;
18use off64::int::create_u40_be;
19use off64::int::Off64ReadInt;
20use off64::u16;
21use off64::usz;
22use off64::Off64Read;
23#[cfg(not(test))]
24use seekable_async_file::SeekableAsyncFile;
25use std::ops::Deref;
26use std::sync::Arc;
27use tokio::join;
28use tokio::sync::RwLock;
29use tokio::sync::RwLockReadGuard;
30use tokio::sync::RwLockWriteGuard;
31use tracing::debug;
32use twox_hash::xxh3::hash64;
33#[cfg(not(test))]
34use write_journal::Transaction;
35#[cfg(not(test))]
36use write_journal::WriteJournal;
37
38pub(crate) const BUCKETS_OFFSETOF_COUNT_LOG2: u64 = 0;
58
59pub(crate) fn BUCKETS_OFFSETOF_BUCKET(bkt_id: u64) -> u64 {
60 BUCKETS_OFFSETOF_COUNT_LOG2 + (bkt_id * 5)
61}
62
63pub(crate) fn BUCKETS_SIZE(bkt_cnt: u64) -> u64 {
64 BUCKETS_OFFSETOF_BUCKET(bkt_cnt)
65}
66
67pub(crate) struct FoundObject {
68 pub prev_dev_offset: Option<u64>,
69 pub next_dev_offset: Option<u64>,
70 pub dev_offset: u64,
71 pub id: u64,
72 pub size: u64,
73}
74
75pub(crate) struct ReadableLockedBucket<'b, 'k> {
77 bucket_id: u64,
78 buckets: &'b Buckets,
79 key_len: u16,
80 key: &'k [u8],
81}
82
83impl<'b, 'k> ReadableLockedBucket<'b, 'k> {
84 pub fn bucket_id(&self) -> u64 {
85 self.bucket_id
86 }
87
88 pub async fn find_object(&self, expected_id: Option<u64>) -> Option<FoundObject> {
89 let mut dev_offset = self.get_head().await;
90 let mut prev_dev_offset = None;
91 while dev_offset > 0 {
92 let (hdr, raw) = join! {
93 self.buckets.pages.read_page_header::<ObjectPageHeader>(dev_offset),
95 self.buckets.dev.read_at(dev_offset, OBJECT_OFF.with_key_len(OBJECT_KEY_LEN_MAX).lpages()),
96 };
97 debug_assert_eq!(hdr.state, ObjectState::Committed);
98 debug_assert_eq!(hdr.deleted_sec, None);
99 let object_id = raw.read_u64_be_at(OBJECT_OFF.id());
100 if (expected_id.is_none() || expected_id.unwrap() == object_id)
101 && raw.read_u16_be_at(OBJECT_OFF.key_len()) == self.key_len
102 && raw.read_at(OBJECT_OFF.key(), self.key_len.into()) == self.key
103 {
104 return Some(FoundObject {
105 dev_offset,
106 next_dev_offset: Some(hdr.next).filter(|o| *o > 0),
107 id: object_id,
108 prev_dev_offset,
109 size: raw.read_u40_be_at(OBJECT_OFF.size()),
110 });
111 };
112 prev_dev_offset = Some(dev_offset);
113 dev_offset = hdr.next;
114 }
115 None
116 }
117
118 pub async fn get_head(&self) -> u64 {
119 self
120 .buckets
121 .journal
122 .read_with_overlay(
123 self.buckets.dev_offset + BUCKETS_OFFSETOF_BUCKET(self.bucket_id),
124 5,
125 )
126 .await
127 .read_u40_be_at(0)
128 << MIN_PAGE_SIZE_POW2
129 }
130}
131
132pub(crate) struct BucketReadLocked<'b, 'k, 'l> {
133 state: ReadableLockedBucket<'b, 'k>,
134 #[allow(unused)]
136 lock: RwLockReadGuard<'l, ()>,
137}
138
139impl<'b, 'k, 'l> Deref for BucketReadLocked<'b, 'k, 'l> {
140 type Target = ReadableLockedBucket<'b, 'k>;
141
142 fn deref(&self) -> &Self::Target {
143 &self.state
144 }
145}
146
147pub(crate) struct BucketWriteLocked<'b, 'k, 'l> {
149 state: ReadableLockedBucket<'b, 'k>,
150 #[allow(unused)]
152 lock: RwLockWriteGuard<'l, ()>,
153}
154
155impl<'b, 'k, 'l> BucketWriteLocked<'b, 'k, 'l> {
156 pub async fn move_object_to_deleted_list_if_exists(
157 &mut self,
158 txn: &mut Transaction,
159 state: &mut State,
161 id: Option<u64>,
162 ) -> Option<CreatedStreamEvent> {
163 let Some(FoundObject {
164 prev_dev_offset: prev_obj,
165 next_dev_offset: next_obj,
166 dev_offset: obj_dev_offset,
167 id: object_id,
168 ..
169 }) = self.find_object(id).await else {
170 return None;
171 };
172
173 match prev_obj {
175 Some(prev_inode_dev_offset) => {
176 self
178 .buckets
179 .pages
180 .update_page_header::<ObjectPageHeader>(txn, prev_inode_dev_offset, |p| {
181 debug_assert_eq!(p.state, ObjectState::Committed);
182 debug_assert_eq!(p.deleted_sec, None);
183 p.next = next_obj.unwrap_or(0);
184 })
185 .await;
186 }
187 None => {
188 self.mutate_head(txn, next_obj.unwrap_or(0));
190 }
191 };
192
193 state.deleted_list.attach(txn, obj_dev_offset).await;
195
196 let e = state.stream.create_event_on_device(txn, StreamEvent {
198 typ: StreamEventType::ObjectDelete,
199 bucket_id: self.state.bucket_id,
200 object_id,
201 });
202
203 Some(e)
204 }
205
206 pub fn mutate_head(&mut self, txn: &mut Transaction, dev_offset: u64) {
207 txn.write_with_overlay(
208 self.buckets.dev_offset + BUCKETS_OFFSETOF_BUCKET(self.bucket_id),
209 create_u40_be(dev_offset >> MIN_PAGE_SIZE_POW2),
210 );
211 }
212}
213
214impl<'b, 'k, 'l> Deref for BucketWriteLocked<'b, 'k, 'l> {
215 type Target = ReadableLockedBucket<'b, 'k>;
216
217 fn deref(&self) -> &Self::Target {
218 &self.state
219 }
220}
221
222pub(crate) struct Buckets {
223 bucket_count_pow2: u8,
224 bucket_lock_count_pow2: u8,
225 bucket_locks: Vec<RwLock<()>>,
226 dev_offset: u64,
227 dev: SeekableAsyncFile,
228 journal: Arc<WriteJournal>,
229 pages: Arc<Pages>,
230}
231
232impl Buckets {
233 pub async fn load_from_device(
234 dev: SeekableAsyncFile,
235 journal: Arc<WriteJournal>,
236 pages: Arc<Pages>,
237 dev_offset: u64,
238 bucket_lock_count_pow2: u8,
239 ) -> Buckets {
240 let bucket_count_pow2 = dev.read_at(dev_offset, 1).await[0];
241 let bucket_locks = (0..1 << bucket_lock_count_pow2)
242 .map(|_| RwLock::new(()))
243 .collect_vec();
244 debug!(
245 bucket_count = 1 << bucket_count_pow2,
246 bucket_lock_count = 1 << bucket_lock_count_pow2,
247 "buckets loaded"
248 );
249 Buckets {
250 bucket_count_pow2,
251 bucket_lock_count_pow2,
252 bucket_locks,
253 dev_offset,
254 dev,
255 journal,
256 pages,
257 }
258 }
259
260 pub async fn format_device(dev: &SeekableAsyncFile, dev_offset: u64, bucket_count_pow2: u8) {
261 let mut raw = vec![0u8; usz!(BUCKETS_SIZE(1 << bucket_count_pow2))];
262 raw[usz!(BUCKETS_OFFSETOF_COUNT_LOG2)] = bucket_count_pow2;
263 dev.write_at(dev_offset, raw).await;
264 }
265
266 fn bucket_id_for_key(&self, key: &[u8]) -> u64 {
267 hash64(key) >> (64 - self.bucket_count_pow2)
268 }
269
270 fn bucket_lock_id_for_bucket_id(&self, bkt_id: u64) -> usize {
271 usz!(bkt_id >> (self.bucket_count_pow2 - self.bucket_lock_count_pow2))
272 }
273
274 fn build_readable_locked_bucket<'b, 'k>(&'b self, key: &'k [u8]) -> ReadableLockedBucket<'b, 'k> {
275 let bucket_id = self.bucket_id_for_key(key);
276 ReadableLockedBucket {
277 bucket_id,
278 buckets: self,
279 key,
280 key_len: u16!(key.len()),
281 }
282 }
283
284 pub async fn get_bucket_for_key<'b, 'k>(&'b self, key: &'k [u8]) -> BucketReadLocked<'b, 'k, '_> {
285 let state = self.build_readable_locked_bucket(key);
286 let lock = self.bucket_locks[self.bucket_lock_id_for_bucket_id(state.bucket_id)]
287 .read()
288 .await;
289 BucketReadLocked { state, lock }
290 }
291
292 pub async fn get_bucket_mut_for_key<'b, 'k>(
293 &'b self,
294 key: &'k [u8],
295 ) -> BucketWriteLocked<'b, 'k, '_> {
296 let state = self.build_readable_locked_bucket(key);
297 let lock = self.bucket_locks[self.bucket_lock_id_for_bucket_id(state.bucket_id)]
298 .write()
299 .await;
300 BucketWriteLocked { state, lock }
301 }
302}