opengauss 0.1.0

A native, synchronous openGauss client
Documentation
//! Asynchronous notifications.

use crate::connection::ConnectionRef;
use crate::{Error, Notification};
use fallible_iterator::FallibleIterator;
use futures::{ready, FutureExt};
use std::pin::Pin;
use std::task::Poll;
use std::time::Duration;
use tokio::time::{self, Instant, Sleep};

/// Notifications from a PostgreSQL backend.
pub struct Notifications<'a> {
    connection: ConnectionRef<'a>,
}

impl<'a> Notifications<'a> {
    pub(crate) fn new(connection: ConnectionRef<'a>) -> Notifications<'a> {
        Notifications { connection }
    }

    /// Returns the number of already buffered pending notifications.
    pub fn len(&self) -> usize {
        self.connection.notifications().len()
    }

    /// Determines if there are any already buffered pending notifications.
    pub fn is_empty(&self) -> bool {
        self.connection.notifications().is_empty()
    }

    /// Returns a nonblocking iterator over notifications.
    ///
    /// If there are no already buffered pending notifications, this iterator will poll the connection but will not
    /// block waiting on notifications over the network. A return value of `None` either indicates that there are no
    /// pending notifications or that the server has disconnected.
    ///
    /// # Note
    ///
    /// This iterator may start returning `Some` after previously returning `None` if more notifications are received.
    pub fn iter(&mut self) -> Iter<'_> {
        Iter {
            connection: self.connection.as_ref(),
        }
    }

    /// Returns a blocking iterator over notifications.
    ///
    /// If there are no already buffered pending notifications, this iterator will block indefinitely waiting on the
    /// PostgreSQL backend server to send one. It will only return `None` if the server has disconnected.
    pub fn blocking_iter(&mut self) -> BlockingIter<'_> {
        BlockingIter {
            connection: self.connection.as_ref(),
        }
    }

    /// Returns an iterator over notifications which blocks a limited amount of time.
    ///
    /// If there are no already buffered pending notifications, this iterator will block waiting on the PostgreSQL
    /// backend server to send one up to the provided timeout. A return value of `None` either indicates that there are
    /// no pending notifications or that the server has disconnected.
    ///
    /// # Note
    ///
    /// This iterator may start returning `Some` after previously returning `None` if more notifications are received.
    pub fn timeout_iter(&mut self, timeout: Duration) -> TimeoutIter<'_> {
        TimeoutIter {
            delay: Box::pin(self.connection.enter(|| time::sleep(timeout))),
            timeout,
            connection: self.connection.as_ref(),
        }
    }
}

/// A nonblocking iterator over pending notifications.
pub struct Iter<'a> {
    connection: ConnectionRef<'a>,
}

impl<'a> FallibleIterator for Iter<'a> {
    type Item = Notification;
    type Error = Error;

    fn next(&mut self) -> Result<Option<Self::Item>, Self::Error> {
        if let Some(notification) = self.connection.notifications_mut().pop_front() {
            return Ok(Some(notification));
        }

        self.connection
            .poll_block_on(|_, notifications, _| Poll::Ready(Ok(notifications.pop_front())))
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        (self.connection.notifications().len(), None)
    }
}

/// A blocking iterator over pending notifications.
pub struct BlockingIter<'a> {
    connection: ConnectionRef<'a>,
}

impl<'a> FallibleIterator for BlockingIter<'a> {
    type Item = Notification;
    type Error = Error;

    fn next(&mut self) -> Result<Option<Self::Item>, Self::Error> {
        if let Some(notification) = self.connection.notifications_mut().pop_front() {
            return Ok(Some(notification));
        }

        self.connection
            .poll_block_on(|_, notifications, done| match notifications.pop_front() {
                Some(notification) => Poll::Ready(Ok(Some(notification))),
                None if done => Poll::Ready(Ok(None)),
                None => Poll::Pending,
            })
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        (self.connection.notifications().len(), None)
    }
}

/// A time-limited blocking iterator over pending notifications.
pub struct TimeoutIter<'a> {
    connection: ConnectionRef<'a>,
    delay: Pin<Box<Sleep>>,
    timeout: Duration,
}

impl<'a> FallibleIterator for TimeoutIter<'a> {
    type Item = Notification;
    type Error = Error;

    fn next(&mut self) -> Result<Option<Self::Item>, Self::Error> {
        if let Some(notification) = self.connection.notifications_mut().pop_front() {
            self.delay.as_mut().reset(Instant::now() + self.timeout);
            return Ok(Some(notification));
        }

        let delay = &mut self.delay;
        let timeout = self.timeout;
        self.connection.poll_block_on(|cx, notifications, done| {
            match notifications.pop_front() {
                Some(notification) => {
                    delay.as_mut().reset(Instant::now() + timeout);
                    return Poll::Ready(Ok(Some(notification)));
                }
                None if done => return Poll::Ready(Ok(None)),
                None => {}
            }

            ready!(delay.poll_unpin(cx));
            Poll::Ready(Ok(None))
        })
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        (self.connection.notifications().len(), None)
    }
}