use core::fmt;
use std::error;
use foundationdb::{future::FdbValue, tuple::Subspace, Database, FdbBindingError, RangeOption};
use futures::StreamExt;
use rand::{rngs::SmallRng, RngCore, SeedableRng};
async fn clear_subspace(db: &Database, subspace: &Subspace) -> Result<(), FdbBindingError> {
db.run(|trx, _| async move {
trx.clear_subspace_range(subspace);
Ok(())
})
.await
}
#[derive(Debug)]
pub struct Overflow;
impl fmt::Display for Overflow {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("queue is too long")
}
}
impl error::Error for Overflow {}
pub struct MicroQueue {
db: Database,
queue: Subspace,
rng: SmallRng,
}
impl MicroQueue {
pub async fn new(
db: Database,
prefix: impl Into<Vec<u8>> + Send + Sync + Unpin,
) -> Result<Self, FdbBindingError> {
let queue = Subspace::from_bytes(prefix);
clear_subspace(&db, &queue).await?;
Ok(Self {
db,
queue,
rng: SmallRng::from_os_rng(),
})
}
async fn last_index(&self) -> Result<usize, FdbBindingError> {
self.db
.run(|trx, _maybe_committed| async move {
Ok(trx
.get_ranges_keyvalues(self.queue.range().into(), true)
.count()
.await)
})
.await
}
pub async fn enqueue(&mut self, value: &str) -> Result<(), FdbBindingError> {
let index = self
.last_index()
.await?
.checked_add(1)
.ok_or_else(|| FdbBindingError::new_custom_error(Box::new(Overflow)))?;
let rands = {
let mut rands = [0_u8; 20];
self.rng.fill_bytes(&mut rands);
rands
};
let key = &self.queue.subspace(&(index, rands.as_slice()));
self.db
.run(|trx, _maybe_committed| async move {
trx.set(key.bytes(), value.as_bytes());
Ok(())
})
.await
}
async fn first_item(&self) -> Result<Option<FdbValue>, FdbBindingError> {
self.db
.run(|trx, _maybe_committed| async move {
trx.get_ranges_keyvalues(RangeOption::from(&self.queue).rev(), true)
.next()
.await
.transpose()
.map_err(Into::into)
})
.await
}
pub async fn dequeue(&mut self) -> Result<Option<String>, FdbBindingError> {
match self.first_item().await? {
None => Ok(None),
Some(fdb_value) => {
let key = fdb_value.key();
let value = std::str::from_utf8(fdb_value.value()).expect("valid UTF-8");
self.db
.run(|trx, _maybe_committed| async move {
trx.clear(key);
Ok(Some(value.to_owned()))
})
.await
}
}
}
}
const LINE: [&str; 13] = [
"Alice", "Bob", "Carol", "Dave", "Eve", "Frank", "George", "Harry", "Ian", "Jack", "Liz",
"Mary", "Nathan",
];
#[tokio::main]
async fn main() -> Result<(), FdbBindingError> {
let fdb = unsafe {
foundationdb::boot()
};
let db = Database::default()?;
let mut q = MicroQueue::new(db, "Q").await?;
for value in LINE {
q.enqueue(value).await?;
}
while let Some(value) = q.dequeue().await? {
println!("{value}");
}
drop(fdb);
Ok(())
}