use futures_channel::oneshot::{channel as oneshot, Receiver, Sender};
use futures_util::lock::Mutex;
use log::debug;
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
#[derive(Default, Debug)]
pub struct Potential<T> {
state: Mutex<State<T>>,
leased: AtomicBool,
}
#[derive(Debug)]
enum State<T> {
Present(T),
Leased(Receiver<T>),
Gone,
}
impl<T> Default for State<T> {
fn default() -> Self {
Self::Gone
}
}
impl<T> Potential<T> {
pub fn new(item: T) -> Self {
let mut me = Potential::empty();
me.set(item);
me
}
pub fn empty() -> Self {
Potential {
state: Mutex::new(State::Gone),
leased: AtomicBool::new(false),
}
}
pub fn is_leased(&self) -> bool {
self.leased.load(Ordering::Relaxed)
}
pub fn set(&mut self, item: T) {
self.state = Mutex::new(State::Present(item));
self.leased.store(false, Ordering::Relaxed);
}
pub async fn lease_on_arc(self: Arc<Self>) -> Result<Lease<T>, Gone<T>> {
self.lease().await
}
pub async fn lease_on_rc(self: Rc<Self>) -> Result<Lease<T>, Gone<T>> {
self.lease().await
}
pub async fn get_mut(&mut self) -> Option<&mut T> {
let state = self.state.get_mut();
loop {
break match state {
State::Gone => None,
State::Present(present) => {
self.leased.store(false, Ordering::Relaxed);
Some(present)
}
State::Leased(receiver) => {
*state = Self::await_return(receiver).await;
self.leased.store(false, Ordering::Relaxed);
continue;
}
};
}
}
pub async fn lease(&self) -> Result<Lease<T>, Gone<T>> {
let mut state = self.state.lock().await;
loop {
break match std::mem::replace(state.deref_mut(), State::Gone) {
State::Gone => {
let (sender, receiver) = oneshot();
*state = State::Leased(receiver);
self.leased.store(true, Ordering::Relaxed);
Err(Gone::new(sender))
}
State::Present(item) => {
let (sender, receiver) = oneshot();
*state = State::Leased(receiver);
self.leased.store(true, Ordering::Relaxed);
Ok(Lease::new(item, sender))
}
State::Leased(mut receiver) => {
*state = Self::await_return(&mut receiver).await;
self.leased.store(false, Ordering::Relaxed);
continue;
}
};
}
}
async fn await_return(receiver: &mut Receiver<T>) -> State<T> {
if let Ok(item) = receiver.await {
State::Present(item)
} else {
debug!("Lease dropped without sending the item back. Subsequent call will panic unless you set a new potential.");
State::Gone
}
}
}
#[derive(Debug)]
pub struct Lease<T> {
item: Option<T>,
owner: Option<Sender<T>>,
}
impl<T> Lease<T> {
fn new(item: T, owner: Sender<T>) -> Self {
Lease {
item: Some(item),
owner: Some(owner),
}
}
pub fn replace(&mut self, replacement: T) -> T {
std::mem::replace(&mut self.item, Some(replacement)).expect("item must be set")
}
pub fn steal(mut self) -> T {
let item = self.item.take().expect("item must be set");
let owner = self.owner.take().expect("owner must be set");
drop(owner);
std::mem::forget(self);
item
}
}
impl<T> Deref for Lease<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.item.as_ref().expect("item must be set")
}
}
impl<T> DerefMut for Lease<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.item.as_mut().expect("item must be set")
}
}
impl<T> Drop for Lease<T> {
fn drop(&mut self) {
if let Some(owner) = self.owner.take() {
if let Some(item) = self.item.take() {
drop(owner.send(item));
}
}
}
}
#[derive(Debug)]
pub struct Gone<T> {
owner: Sender<T>,
}
impl<T> Gone<T> {
fn new(owner: Sender<T>) -> Self {
Gone { owner }
}
pub fn set(self, item: T) -> Lease<T> {
Lease::new(item, self.owner)
}
}
impl<T> fmt::Display for Gone<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Potential item is gone")
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::future::Future;
use std::task::{Context, Poll};
fn is_sync<T: Sync>(_: T) {}
fn is_send<T: Send>(_: T) {}
fn is_static<T: 'static>(_: T) {}
#[test]
fn lease_is_sync() {
let sut = Potential::new(true);
is_sync(sut.lease());
}
#[test]
fn lease_is_send() {
let sut = Potential::new(true);
is_send(sut.lease());
}
#[test]
fn lease_on_rc_is_static() {
let sut = Rc::new(Potential::new(true));
is_static(sut.lease_on_rc());
}
#[test]
fn lease_on_arc_is_static() {
let sut = Arc::new(Potential::new(true));
is_static(sut.lease_on_arc());
}
#[test]
fn lease_on_arc_is_sync() {
let sut = Arc::new(Potential::new(true));
is_sync(sut.lease_on_arc());
}
#[test]
fn lease_on_arc_is_send() {
let sut = Arc::new(Potential::new(true));
is_send(sut.lease_on_arc());
}
#[test]
fn it_blocks() {
let sut = Potential::new(true);
let mut lease1_fut = Box::pin(sut.lease());
let mut lease2_fut = Box::pin(sut.lease());
let lease1 = ready_ok(lease1_fut.as_mut().poll(&mut cx()));
assert!(
lease2_fut.as_mut().poll(&mut cx()).is_pending(),
"Second lease is blocked by the first"
);
drop(lease1);
let lease2 = ready_ok(lease2_fut.as_mut().poll(&mut cx()));
drop(lease2);
}
#[test]
fn it_hangs_on_no_drop() {
let sut = Potential::new(true);
let mut lease1_fut = Box::pin(sut.lease());
let mut lease2_fut = Box::pin(sut.lease());
let lease1 = ready_ok(lease1_fut.as_mut().poll(&mut cx()));
assert!(
lease2_fut.as_mut().poll(&mut cx()).is_pending(),
"Second lease is blocked by the first"
);
std::mem::forget(lease1);
assert!(
lease2_fut.as_mut().poll(&mut cx()).is_pending(),
"Second lease is blocked by the first"
);
}
#[test]
fn it_handles_stealing() {
let sut = Potential::new(true);
let mut lease1_fut = Box::pin(sut.lease());
let mut lease2_fut = Box::pin(sut.lease());
let lease1 = ready_ok(lease1_fut.as_mut().poll(&mut cx()));
assert!(
lease2_fut.as_mut().poll(&mut cx()).is_pending(),
"Second lease is blocked by the first"
);
lease1.steal();
ready_err(lease2_fut.as_mut().poll(&mut cx()));
}
fn ready_ok<T, E>(poll: Poll<Result<T, E>>) -> T {
match poll {
Poll::Ready(Ok(t)) => t,
_ => panic!("poll is not ready or is err"),
}
}
fn ready_err<T, E>(poll: Poll<Result<T, E>>) -> E {
match poll {
Poll::Ready(Err(e)) => e,
_ => panic!("poll is not ready or is ok"),
}
}
fn cx() -> Context<'static> {
let waker = futures_util::task::noop_waker_ref();
let cx = Context::from_waker(waker);
cx
}
}