use object_store::path::Path;
use uuid::Uuid;
#[derive(Clone, Debug)]
pub struct WatchableOnceCell<T: Clone + std::fmt::Debug> {
rx: tokio::sync::watch::Receiver<Option<T>>,
tx: tokio::sync::watch::Sender<Option<T>>,
}
#[derive(Clone, Debug)]
pub struct WatchableOnceCellReader<T: Clone + std::fmt::Debug> {
rx: tokio::sync::watch::Receiver<Option<T>>,
}
impl<T: Clone + std::fmt::Debug> WatchableOnceCell<T> {
pub fn new() -> Self {
let (tx, rx) = tokio::sync::watch::channel(None);
Self { rx, tx }
}
pub fn write(&self, val: T) {
self.tx.send_if_modified(|v| {
if v.is_some() {
return false;
}
v.replace(val);
true
});
}
pub fn reader(&self) -> WatchableOnceCellReader<T> {
WatchableOnceCellReader {
rx: self.rx.clone(),
}
}
}
impl<T: Clone + std::fmt::Debug> Default for WatchableOnceCell<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: Clone + std::fmt::Debug> WatchableOnceCellReader<T> {
pub fn read(&self) -> Option<T> {
self.rx.borrow().clone()
}
pub async fn await_value(&mut self) -> T {
self.rx
.wait_for(|v| v.is_some())
.await
.expect("watch channel closed")
.clone()
.expect("no value found")
}
}
pub fn bit_reverse_u64(n: u64) -> u64 {
n.reverse_bits()
}
pub fn bit_reversed_filename(id: u64, ext: &str) -> String {
format!("{:064b}.{}", bit_reverse_u64(id), ext)
}
pub fn parse_bit_reversed_filename(filename: &str) -> Option<u64> {
let stem = filename.split('.').next()?;
if stem.len() != 64 || !stem.chars().all(|c| c == '0' || c == '1') {
return None;
}
let reversed = u64::from_str_radix(stem, 2).ok()?;
Some(bit_reverse_u64(reversed))
}
pub fn region_base_path(base_path: &Path, region_id: &Uuid) -> Path {
base_path
.child("_mem_wal")
.child(region_id.as_hyphenated().to_string())
}
pub fn region_wal_path(base_path: &Path, region_id: &Uuid) -> Path {
region_base_path(base_path, region_id).child("wal")
}
pub fn region_manifest_path(base_path: &Path, region_id: &Uuid) -> Path {
region_base_path(base_path, region_id).child("manifest")
}
pub fn flushed_memtable_path(
base_path: &Path,
region_id: &Uuid,
random_hash: &str,
generation: u64,
) -> Path {
region_base_path(base_path, region_id).child(format!("{}_gen_{}", random_hash, generation))
}
pub fn generate_random_hash() -> String {
let bytes: [u8; 4] = rand::random();
format!(
"{:02x}{:02x}{:02x}{:02x}",
bytes[0], bytes[1], bytes[2], bytes[3]
)
}
pub fn wal_entry_filename(wal_entry_position: u64) -> String {
bit_reversed_filename(wal_entry_position, "arrow")
}
pub fn manifest_filename(version: u64) -> String {
bit_reversed_filename(version, "binpb")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bit_reverse_u64() {
assert_eq!(bit_reverse_u64(0), 0);
assert_eq!(bit_reverse_u64(1), 0x8000000000000000);
assert_eq!(bit_reverse_u64(5), 0xa000000000000000);
for i in [0u64, 1, 2, 5, 100, 1000, u64::MAX / 2, u64::MAX] {
assert_eq!(bit_reverse_u64(bit_reverse_u64(i)), i);
}
}
#[test]
fn test_bit_reversed_filename() {
let filename = bit_reversed_filename(1, "binpb");
assert_eq!(
filename,
"1000000000000000000000000000000000000000000000000000000000000000.binpb"
);
let filename = bit_reversed_filename(5, "lance");
assert_eq!(
filename,
"1010000000000000000000000000000000000000000000000000000000000000.lance"
);
}
#[test]
fn test_parse_bit_reversed_filename() {
for id in [1u64, 5, 100, 1000, u64::MAX / 2] {
let filename = bit_reversed_filename(id, "binpb");
let parsed = parse_bit_reversed_filename(&filename);
assert_eq!(parsed, Some(id), "Failed round-trip for id={}", id);
}
assert_eq!(parse_bit_reversed_filename("invalid"), None);
assert_eq!(parse_bit_reversed_filename("123.binpb"), None);
assert_eq!(
parse_bit_reversed_filename(
"10100000000000000000000000000000000000000000000000000000000000002.binpb"
),
None
);
}
#[test]
fn test_region_paths() {
let base_path = Path::from("my/dataset");
let region_id = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000").unwrap();
assert_eq!(
region_base_path(&base_path, ®ion_id).as_ref(),
"my/dataset/_mem_wal/550e8400-e29b-41d4-a716-446655440000"
);
assert_eq!(
region_wal_path(&base_path, ®ion_id).as_ref(),
"my/dataset/_mem_wal/550e8400-e29b-41d4-a716-446655440000/wal"
);
assert_eq!(
region_manifest_path(&base_path, ®ion_id).as_ref(),
"my/dataset/_mem_wal/550e8400-e29b-41d4-a716-446655440000/manifest"
);
assert_eq!(
flushed_memtable_path(&base_path, ®ion_id, "a1b2c3d4", 5).as_ref(),
"my/dataset/_mem_wal/550e8400-e29b-41d4-a716-446655440000/a1b2c3d4_gen_5"
);
let empty_base = Path::from("");
assert_eq!(
region_wal_path(&empty_base, ®ion_id).as_ref(),
"_mem_wal/550e8400-e29b-41d4-a716-446655440000/wal"
);
}
#[test]
fn test_generate_random_hash() {
let hash = generate_random_hash();
assert_eq!(hash.len(), 8);
assert!(hash.chars().all(|c| c.is_ascii_hexdigit()));
let hash2 = generate_random_hash();
assert_ne!(hash, hash2);
}
#[tokio::test]
async fn test_watchable_once_cell_write_once() {
let cell = WatchableOnceCell::new();
let reader = cell.reader();
assert_eq!(reader.read(), None);
cell.write(42);
assert_eq!(reader.read(), Some(42));
cell.write(100);
assert_eq!(reader.read(), Some(42));
}
#[tokio::test]
async fn test_watchable_once_cell_await() {
let cell = WatchableOnceCell::new();
let mut reader = cell.reader();
let handle = tokio::spawn(async move { reader.await_value().await });
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
cell.write(123);
let result = handle.await.unwrap();
assert_eq!(result, 123);
}
#[tokio::test]
async fn test_watchable_once_cell_multiple_readers() {
let cell = WatchableOnceCell::new();
let mut reader1 = cell.reader();
let mut reader2 = cell.reader();
let h1 = tokio::spawn(async move { reader1.await_value().await });
let h2 = tokio::spawn(async move { reader2.await_value().await });
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
cell.write(456);
assert_eq!(h1.await.unwrap(), 456);
assert_eq!(h2.await.unwrap(), 456);
}
}