use std::marker::PhantomData;
#[must_use = "SendPermit must be consumed via commit() or abort()"]
pub struct SendPermit<T> {
commit_fn: Option<Box<dyn FnOnce(&[u8]) -> Result<(), T> + Send + Sync>>,
abort_fn: Option<Box<dyn FnOnce() + Send + Sync>>,
_marker: PhantomData<T>,
}
impl<T> SendPermit<T> {
pub fn new<F, A>(commit_fn: F, abort_fn: A) -> Self
where
F: FnOnce(&[u8]) -> Result<(), T> + Send + Sync + 'static,
A: FnOnce() + Send + Sync + 'static,
{
Self {
commit_fn: Some(Box::new(commit_fn)),
abort_fn: Some(Box::new(abort_fn)),
_marker: PhantomData,
}
}
pub fn commit(mut self, data: &[u8]) -> Result<(), T> {
if let Some(commit_fn) = self.commit_fn.take() {
match commit_fn(data) {
Ok(()) => {
self.abort_fn = None;
Ok(())
}
Err(error) => {
if let Some(abort_fn) = self.abort_fn.take() {
abort_fn();
}
Err(error)
}
}
} else {
panic!("SendPermit already consumed")
}
}
pub fn abort(mut self) {
if let Some(abort_fn) = self.abort_fn.take() {
self.commit_fn = None;
abort_fn();
}
}
}
impl<T> Drop for SendPermit<T> {
fn drop(&mut self) {
if let Some(abort_fn) = self.abort_fn.take() {
abort_fn();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Mutex};
#[test]
fn test_permit_commit() {
let committed = Arc::new(Mutex::new(Vec::new()));
let committed_clone = Arc::clone(&committed);
let aborted = Arc::new(Mutex::new(false));
let aborted_clone = Arc::clone(&aborted);
let permit = SendPermit::new(
move |data: &[u8]| {
committed_clone.lock().unwrap().extend_from_slice(data);
Ok::<(), ()>(())
},
move || {
*aborted_clone.lock().unwrap() = true;
},
);
assert!(permit.commit(b"test data").is_ok());
assert_eq!(*committed.lock().unwrap(), b"test data");
assert!(!*aborted.lock().unwrap());
}
#[test]
fn test_permit_abort() {
let committed = Arc::new(Mutex::new(Vec::new()));
let committed_clone = Arc::clone(&committed);
let aborted = Arc::new(Mutex::new(false));
let aborted_clone = Arc::clone(&aborted);
let permit = SendPermit::new(
move |data: &[u8]| {
committed_clone.lock().unwrap().extend_from_slice(data);
Ok::<(), ()>(())
},
move || {
*aborted_clone.lock().unwrap() = true;
},
);
permit.abort();
assert!(committed.lock().unwrap().is_empty());
assert!(*aborted.lock().unwrap());
}
#[test]
fn test_permit_commit_error_aborts_reservation() {
let abort_count = Arc::new(Mutex::new(0usize));
let abort_count_clone = Arc::clone(&abort_count);
let permit = SendPermit::new(
move |data: &[u8]| {
assert_eq!(data, b"too large");
Err::<(), &'static str>("send queue rejected commit")
},
move || {
*abort_count_clone.lock().unwrap() += 1;
},
);
assert_eq!(
permit.commit(b"too large"),
Err("send queue rejected commit")
);
assert_eq!(*abort_count.lock().unwrap(), 1);
}
#[test]
fn test_permit_drop_aborts() {
let committed = Arc::new(Mutex::new(Vec::new()));
let committed_clone = Arc::clone(&committed);
let aborted = Arc::new(Mutex::new(false));
let aborted_clone = Arc::clone(&aborted);
{
let _permit = SendPermit::new(
move |data: &[u8]| {
committed_clone.lock().unwrap().extend_from_slice(data);
Ok::<(), ()>(())
},
move || {
*aborted_clone.lock().unwrap() = true;
},
);
}
assert!(committed.lock().unwrap().is_empty());
assert!(*aborted.lock().unwrap());
}
}