use crate::DropAll;
use crate::Guard;
use core::ops::Deref;
use core::ops::DerefMut;
use core::unreachable;
use metrique_core::CloseValue;
use tokio::sync::oneshot;
fn make_slot<T: CloseValue>(initial_value: T) -> (SlotGuard<T>, Waiting<T::Closed>) {
let (tx, rx) = oneshot::channel();
(
SlotGuard {
slot: SlotI::Writable {
value: initial_value,
tx,
},
parent_drop_mode: OnParentDrop::Discard,
},
Waiting { rx },
)
}
pub struct Slot<T: CloseValue> {
tx: Option<SlotGuard<T>>,
rx: Option<Waiting<T::Closed>>,
data: Option<T::Closed>,
}
pub struct LazySlot<T: CloseValue> {
slot: Option<Slot<T>>,
}
impl<T: CloseValue> Default for LazySlot<T> {
fn default() -> Self {
Self { slot: None }
}
}
impl<T: CloseValue> CloseValue for LazySlot<T> {
type Closed = Option<T::Closed>;
fn close(self) -> Self::Closed {
self.slot.and_then(|s| s.close())
}
}
impl<T: CloseValue> LazySlot<T> {
pub fn open(&mut self, initial_value: T, mode: OnParentDrop) -> Option<SlotGuard<T>> {
if self.slot.is_some() {
return None;
}
let mut slot = Slot::new(initial_value);
let guard = slot
.open(mode)
.expect("unreachable: the slot is not opened twice");
self.slot = Some(slot);
Some(guard)
}
}
pub enum OnParentDrop {
Wait(FlushGuard),
Discard,
}
impl<T: CloseValue> Slot<T> {
pub fn new(value: T) -> Self {
let (tx, rx) = make_slot(value);
Self {
tx: Some(tx),
rx: Some(rx),
data: None,
}
}
#[doc(hidden)]
#[deprecated(note = "Use Slot::open instead to explicitly chose the on drop behavior.")]
pub fn open_slot(&mut self) -> Option<SlotGuard<T>> {
self.tx.take()
}
pub fn open(&mut self, mode: OnParentDrop) -> Option<SlotGuard<T>> {
let mut slot = self.tx.take();
if let Some(slot) = slot.as_mut() {
slot.parent_drop_mode = mode;
}
slot
}
pub async fn wait_for_data(&mut self) -> &mut Option<T::Closed> {
if let Some(rx) = self.rx.take() {
self.data = rx.wait_for_value().await;
}
&mut self.data
}
}
impl<T: Default + CloseValue> Default for Slot<T> {
fn default() -> Self {
Self::new(T::default())
}
}
#[diagnostic::do_not_recommend]
impl<T: CloseValue> CloseValue for Slot<T> {
type Closed = Option<T::Closed>;
fn close(self) -> Self::Closed {
match (self.data, self.rx) {
(Some(data), _) => Some(data),
(_, Some(rx)) => rx.take_value(),
_ => unreachable!("cannot enter this state"),
}
}
}
struct Waiting<T> {
rx: oneshot::Receiver<T>,
}
impl<T> Waiting<T> {
fn take_value(mut self) -> Option<T> {
self.rx.try_recv().ok()
}
async fn wait_for_value(self) -> Option<T> {
self.rx.await.ok()
}
}
pub struct SlotGuard<T: CloseValue> {
slot: SlotI<T>,
parent_drop_mode: OnParentDrop,
}
impl<T: CloseValue> SlotGuard<T> {
pub fn parent_is_closed(&self) -> bool {
match &self.slot {
SlotI::Writable { tx, .. } => tx.is_closed(),
SlotI::Dropped => unreachable!("this state is only entered after drop"),
}
}
pub fn delay_flush(&mut self, flush_guard: FlushGuard) {
self.parent_drop_mode = OnParentDrop::Wait(flush_guard);
}
}
pub struct FlushGuard {
pub(crate) _drop_guard: Guard,
}
pub struct ForceFlushGuard {
pub(crate) _drop_guard: DropAll,
}
impl<T: CloseValue> Deref for SlotGuard<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
match &self.slot {
SlotI::Writable { value, .. } => value,
SlotI::Dropped => unreachable!("only occurs after drop"),
}
}
}
impl<T: CloseValue> DerefMut for SlotGuard<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
match &mut self.slot {
SlotI::Writable { value, .. } => value,
SlotI::Dropped => unreachable!("only set after drop"),
}
}
}
impl<T: CloseValue> Drop for SlotGuard<T> {
fn drop(&mut self) {
if let SlotI::Writable { value, tx } = std::mem::replace(&mut self.slot, SlotI::Dropped) {
let _ = tx.send(value.close());
} else {
unreachable!("move out of slot must only occur during drop")
}
}
}
enum SlotI<T: CloseValue> {
Writable {
value: T,
tx: oneshot::Sender<T::Closed>,
},
Dropped,
}
#[cfg(test)]
mod test {
use metrique_core::CloseValue;
use crate::Slot;
use super::{LazySlot, OnParentDrop};
#[derive(Default)]
struct TestCloseable;
impl CloseValue for TestCloseable {
type Closed = usize;
fn close(self) -> Self::Closed {
42
}
}
#[test]
fn test_double_open_lazy() {
let mut slot: LazySlot<TestCloseable> = LazySlot::default();
let _guard = slot
.open(TestCloseable, OnParentDrop::Discard)
.expect("open once");
assert!(slot.open(TestCloseable, OnParentDrop::Discard).is_none());
}
#[test]
fn test_double_open() {
let mut slot: Slot<TestCloseable> = Slot::default();
let _guard = slot.open(OnParentDrop::Discard).expect("open once");
assert!(slot.open(OnParentDrop::Discard).is_none());
}
#[tokio::test]
async fn test_wait_for_data() {
let mut slot: Slot<TestCloseable> = Slot::default();
drop(slot.open(OnParentDrop::Discard));
assert_eq!(slot.wait_for_data().await, &Some(42));
}
#[test]
fn test_parent_is_closed() {
let mut slot: Slot<TestCloseable> = Slot::default();
let guard = slot.open(OnParentDrop::Discard).unwrap();
assert_eq!(guard.parent_is_closed(), false);
drop(slot);
assert_eq!(guard.parent_is_closed(), true);
}
}