use std::cmp::Ordering;
use thiserror::Error;
use crate::{subscriber::Subscriber, AcceptXmrError, Invoice, InvoiceId, SubIndex};
pub(crate) struct InvoicesDb(sled::Tree);
impl InvoicesDb {
pub fn new(path: &str) -> Result<InvoicesDb, InvoiceStorageError> {
let db = sled::Config::default()
.path(path)
.flush_every_ms(None)
.open()?;
let tree = db.open_tree(b"pending invoices")?;
tree.set_merge_operator(InvoicesDb::update_merge);
Ok(InvoicesDb(tree))
}
pub fn insert(&self, invoice: &Invoice) -> Result<Option<Invoice>, InvoiceStorageError> {
let invoice_id = invoice.id();
let key = bincode::encode_to_vec(&invoice_id, bincode::config::standard())?;
let value = bincode::encode_to_vec(&invoice, bincode::config::standard())?;
let old = self.0.insert(key, value)?;
if let Some(old_value) = old {
Ok(Some(
bincode::decode_from_slice(&old_value, bincode::config::standard())?.0,
))
} else {
Ok(None)
}
}
pub fn remove(&self, invoice_id: InvoiceId) -> Result<Option<Invoice>, InvoiceStorageError> {
let key = bincode::encode_to_vec(&invoice_id, bincode::config::standard())?;
let old = self.0.remove(key).transpose();
old.map(|ivec_or_err| {
Ok(bincode::decode_from_slice(&ivec_or_err?, bincode::config::standard())?.0)
})
.transpose()
}
pub fn get(&self, invoice_id: InvoiceId) -> Result<Option<Invoice>, InvoiceStorageError> {
let key = bincode::encode_to_vec(&invoice_id, bincode::config::standard())?;
let current = self.0.get(key).transpose();
current
.map(|ivec_or_err| {
Ok(bincode::decode_from_slice(&ivec_or_err?, bincode::config::standard())?.0)
})
.transpose()
}
pub fn iter(
&self,
) -> impl DoubleEndedIterator<Item = Result<Invoice, InvoiceStorageError>> + Send + Sync {
self.0.iter().values().flat_map(|r| {
r.map(|ivec| {
bincode::decode_from_slice(&ivec, bincode::config::standard())
.map_err(InvoiceStorageError::from)
.map(|tup| tup.0)
})
.map_err(InvoiceStorageError::from)
})
}
pub fn contains_key(&self, invoice_id: InvoiceId) -> Result<bool, InvoiceStorageError> {
let key = bincode::encode_to_vec(&invoice_id, bincode::config::standard())?;
self.0.contains_key(key).map_err(InvoiceStorageError::from)
}
pub fn contains_sub_index(&self, sub_index: SubIndex) -> Result<bool, InvoiceStorageError> {
let key = bincode::encode_to_vec(&sub_index, bincode::config::standard())?;
Ok(self.0.scan_prefix(key).next().is_some())
}
pub fn update(&self, invoice_id: InvoiceId, new: &Invoice) -> Result<Invoice, AcceptXmrError> {
let key = bincode::encode_to_vec(&invoice_id, bincode::config::standard())
.map_err(InvoiceStorageError::from)?;
let new_ivec = bincode::encode_to_vec(&new, bincode::config::standard())
.map_err(InvoiceStorageError::from)?;
let maybe_old = self
.0
.merge(key, new_ivec)
.map_err(InvoiceStorageError::from)?;
match maybe_old {
Some(ivec) => Ok(
bincode::decode_from_slice(&ivec, bincode::config::standard())
.map_err(InvoiceStorageError::from)?
.0,
),
None => Err(AcceptXmrError::from(InvoiceStorageError::Update(
invoice_id,
))),
}
}
pub fn subscribe(
&self,
invoice_id: InvoiceId,
) -> Result<Option<Subscriber>, InvoiceStorageError> {
let prefix = bincode::encode_to_vec(&invoice_id, bincode::config::standard())?;
let sled_subscriber = self.0.watch_prefix(prefix);
if self.contains_key(invoice_id)? {
Ok(Some(Subscriber::new(sled_subscriber)))
} else {
Ok(None)
}
}
pub fn subscribe_all(&self) -> Subscriber {
let sled_subscriber = self.0.watch_prefix(vec![]);
Subscriber::new(sled_subscriber)
}
pub fn flush(&self) -> Result<(), InvoiceStorageError> {
self.0.flush()?;
Ok(())
}
pub fn clone(&self) -> InvoicesDb {
InvoicesDb(self.0.clone())
}
pub fn lowest_height(&self) -> Result<Option<u64>, InvoiceStorageError> {
self.iter()
.min_by(|invoice_1, invoice_2| {
match (invoice_1, invoice_2) {
(Err(_), _) => Ordering::Greater,
(_, Err(_)) => Ordering::Less,
(Ok(inv1), Ok(inv2)) => inv1.current_height().cmp(&inv2.current_height()),
}
})
.transpose()
.map(|maybe_invoice| maybe_invoice.map(|invoice| invoice.current_height()))
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
fn update_merge(_key: &[u8], old_value: Option<&[u8]>, new_value: &[u8]) -> Option<Vec<u8>> {
if old_value.is_some() {
Some(new_value.to_vec())
} else {
None
}
}
}
#[derive(Error, Debug)]
pub enum InvoiceStorageError {
#[error("database error: {0}")]
Database(#[from] sled::Error),
#[error("no value with key {0} to update")]
Update(InvoiceId),
#[error("Serialization error: {0}")]
Serialize(#[from] bincode::error::EncodeError),
#[error("Deserialization error: {0}")]
Deserialize(#[from] bincode::error::DecodeError),
}