1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
pub(crate) use self::sync::{AcquireError, OwnedSemaphorePermit as Permit};
use futures_core::ready;
use std::{
    fmt,
    future::Future,
    mem,
    pin::Pin,
    sync::{Arc, Weak},
    task::{Context, Poll},
};
use tokio::sync;

#[derive(Debug)]
pub(crate) struct Semaphore {
    semaphore: Arc<sync::Semaphore>,
    state: State,
}

#[derive(Debug)]
pub(crate) struct Close {
    semaphore: Weak<sync::Semaphore>,
}

enum State {
    Waiting(Pin<Box<dyn Future<Output = Result<Permit, AcquireError>> + Send + Sync + 'static>>),
    Ready(Permit),
    Empty,
}

impl Semaphore {
    pub(crate) fn new_with_close(permits: usize) -> (Self, Close) {
        let semaphore = Arc::new(sync::Semaphore::new(permits));
        let close = Close {
            semaphore: Arc::downgrade(&semaphore),
        };
        let semaphore = Self {
            semaphore,
            state: State::Empty,
        };
        (semaphore, close)
    }

    pub(crate) fn new(permits: usize) -> Self {
        Self {
            semaphore: Arc::new(sync::Semaphore::new(permits)),
            state: State::Empty,
        }
    }

    pub(crate) fn poll_acquire(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), AcquireError>> {
        loop {
            self.state = match self.state {
                State::Ready(_) => return Poll::Ready(Ok(())),
                State::Waiting(ref mut fut) => {
                    let permit = ready!(Pin::new(fut).poll(cx))?;
                    State::Ready(permit)
                }
                State::Empty => State::Waiting(Box::pin(self.semaphore.clone().acquire_owned())),
            };
        }
    }

    pub(crate) fn take_permit(&mut self) -> Option<Permit> {
        if let State::Ready(permit) = mem::replace(&mut self.state, State::Empty) {
            return Some(permit);
        }
        None
    }
}

impl Clone for Semaphore {
    fn clone(&self) -> Self {
        Self {
            semaphore: self.semaphore.clone(),
            state: State::Empty,
        }
    }
}

impl fmt::Debug for State {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            State::Waiting(_) => f
                .debug_tuple("State::Waiting")
                .field(&format_args!("..."))
                .finish(),
            State::Ready(ref r) => f.debug_tuple("State::Ready").field(&r).finish(),
            State::Empty => f.debug_tuple("State::Empty").finish(),
        }
    }
}

impl Close {
    /// Close the semaphore, waking any remaining tasks currently awaiting a permit.
    pub(crate) fn close(self) {
        if let Some(semaphore) = self.semaphore.upgrade() {
            semaphore.close()
        }
    }
}