use crate::ctx::Ctx;
use crate::ctx::State;
use crate::metrics::BlobdMetrics;
use crate::object::OBJECT_OFF;
use crate::page::ObjectPageHeader;
use crate::page::ObjectState;
use crate::page::Pages;
#[cfg(test)]
use crate::test_util::device::TestSeekableAsyncFile as SeekableAsyncFile;
#[cfg(test)]
use crate::test_util::journal::TestTransaction as Transaction;
use crate::util::get_now_sec;
use off64::int::create_u48_be;
use off64::int::Off64AsyncReadInt;
use off64::usz;
#[cfg(not(test))]
use seekable_async_file::SeekableAsyncFile;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
#[cfg(not(test))]
use write_journal::Transaction;
const OFFSETOF_HEAD: u64 = 0;
const OFFSETOF_TAIL: u64 = OFFSETOF_HEAD + 5;
pub(crate) const INCOMPLETE_LIST_STATE_SIZE: u64 = OFFSETOF_TAIL + 5;
pub(crate) struct IncompleteList {
dev_offset: u64,
dev: SeekableAsyncFile,
head: u64,
metrics: Arc<BlobdMetrics>,
pages: Arc<Pages>,
reap_objects_after_secs: u64,
tail: u64,
}
impl IncompleteList {
pub async fn load_from_device(
dev: SeekableAsyncFile,
dev_offset: u64,
pages: Arc<Pages>,
metrics: Arc<BlobdMetrics>,
reap_objects_after_secs: u64,
) -> Self {
let head = dev.read_u48_be_at(dev_offset + OFFSETOF_HEAD).await;
let tail = dev.read_u48_be_at(dev_offset + OFFSETOF_TAIL).await;
Self {
dev_offset,
dev,
head,
metrics,
pages,
reap_objects_after_secs,
tail,
}
}
pub async fn format_device(dev: &SeekableAsyncFile, dev_offset: u64) {
let raw = vec![0u8; usz!(INCOMPLETE_LIST_STATE_SIZE)];
dev.write_at(dev_offset, raw).await;
}
fn update_head(&mut self, txn: &mut Transaction, page_dev_offset: u64) {
txn.write(
self.dev_offset + OFFSETOF_HEAD,
create_u48_be(page_dev_offset),
);
self.head = page_dev_offset;
}
fn update_tail(&mut self, txn: &mut Transaction, page_dev_offset: u64) {
txn.write(
self.dev_offset + OFFSETOF_TAIL,
create_u48_be(page_dev_offset),
);
self.tail = page_dev_offset;
}
pub async fn attach(&mut self, txn: &mut Transaction, page_dev_offset: u64, page_size_pow2: u8) {
self.metrics.incr_incomplete_object_count(txn, 1);
self
.pages
.write_page_header(txn, page_dev_offset, ObjectPageHeader {
deleted_sec: None,
metadata_size_pow2: page_size_pow2,
next: 0,
prev: self.tail,
state: ObjectState::Incomplete,
});
if self.head == 0 {
self.update_head(txn, page_dev_offset);
};
if self.tail != 0 {
self
.pages
.update_page_header::<ObjectPageHeader>(txn, self.tail, |i| {
debug_assert_eq!(i.state, ObjectState::Incomplete);
debug_assert_eq!(i.deleted_sec, None);
i.next = page_dev_offset;
})
.await;
};
self.update_tail(txn, page_dev_offset);
}
pub async fn detach(&mut self, txn: &mut Transaction, page_dev_offset: u64) {
self.metrics.decr_incomplete_object_count(txn, 1);
let hdr = self
.pages
.read_page_header::<ObjectPageHeader>(page_dev_offset)
.await;
if hdr.prev == 0 {
self.update_head(txn, hdr.next);
} else {
self
.pages
.update_page_header::<ObjectPageHeader>(txn, hdr.prev, |i| {
debug_assert_eq!(i.state, ObjectState::Incomplete);
debug_assert_eq!(i.deleted_sec, None);
i.next = hdr.next;
})
.await;
};
if hdr.next == 0 {
self.update_tail(txn, hdr.prev);
} else {
self
.pages
.update_page_header::<ObjectPageHeader>(txn, hdr.next, |i| {
debug_assert_eq!(i.state, ObjectState::Incomplete);
debug_assert_eq!(i.deleted_sec, None);
i.prev = hdr.prev;
})
.await;
};
}
}
pub(crate) async fn maybe_reap_next_incomplete(
state: &mut State,
txn: &mut Transaction,
) -> Result<(), u64> {
let page_dev_offset = state.incomplete_list.head;
if page_dev_offset == 0 {
return Err(3600);
};
let created_sec = state
.incomplete_list
.dev
.read_u48_be_at(page_dev_offset + OBJECT_OFF.created_ms())
.await
/ 1000;
let now = get_now_sec();
let reap_time = created_sec + state.incomplete_list.reap_objects_after_secs + 3600;
if now < reap_time {
return Err(reap_time - now);
};
state.incomplete_list.detach(txn, page_dev_offset).await;
state.deleted_list.attach(txn, page_dev_offset).await;
Ok(())
}
pub(crate) async fn start_incomplete_list_reaper_background_loop(ctx: Arc<Ctx>) {
loop {
let (txn, sleep_sec) = {
let mut state = ctx.state.lock().await;
let mut txn = ctx.journal.begin_transaction();
let sleep_sec = maybe_reap_next_incomplete(&mut state, &mut txn)
.await
.err()
.unwrap_or(0);
(txn, sleep_sec)
};
ctx.journal.commit_transaction(txn).await;
sleep(Duration::from_secs(sleep_sec)).await;
}
}