1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
use std::convert::TryFrom;

use futures::{Future, FutureExt};

use super::{BuilderState, Command, KeyOperation, Kv, Output};
use crate::{
    kv::{IncompatibleTypeError, Numeric, Value},
    Error,
};

/// Executes [`Command::Set`] when awaited. Also offers methods to customize the
/// options for the operation.
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Builder<'a, Kv, V> {
    state: BuilderState<'a, Options<'a, Kv>, Result<V, Error>>,
}

struct Options<'a, Kv> {
    kv: &'a Kv,
    namespace: Option<String>,
    key: String,
    increment: bool,
    amount: Numeric,
    saturating: bool,
}

impl<'a, K, V> Builder<'a, K, V>
where
    K: Kv,
{
    pub(crate) fn new(
        kv: &'a K,
        namespace: Option<String>,
        increment: bool,
        key: String,
        amount: Numeric,
    ) -> Self {
        Self {
            state: BuilderState::Pending(Some(Options {
                key,
                kv,
                namespace,
                increment,
                amount,
                saturating: true,
            })),
        }
    }

    fn options(&mut self) -> &mut Options<'a, K> {
        if let BuilderState::Pending(Some(options)) = &mut self.state {
            options
        } else {
            panic!("Attempted to use after retrieving the result")
        }
    }

    /// Allows overflowing the value.
    pub fn allow_overflow(mut self) -> Self {
        self.options().saturating = false;
        self
    }
}

impl<'a, K, V> Future for Builder<'a, K, V>
where
    K: Kv,
    V: TryFrom<Numeric, Error = IncompatibleTypeError>,
{
    type Output = Result<V, Error>;

    fn poll(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Self::Output> {
        match &mut self.state {
            BuilderState::Executing(future) => future.as_mut().poll(cx),
            BuilderState::Pending(builder) => {
                let Options {
                    kv,
                    namespace,
                    key,
                    increment,
                    amount,
                    saturating,
                } = builder.take().expect("expected builder to have options");
                let future = async move {
                    let result = kv
                        .execute_key_operation(KeyOperation {
                            namespace,
                            key,
                            command: if increment {
                                Command::Increment { amount, saturating }
                            } else {
                                Command::Decrement { amount, saturating }
                            },
                        })
                        .await?;
                    if let Output::Value(Some(Value::Numeric(value))) = result {
                        Ok(V::try_from(value).expect("server should send back identical type"))
                    } else {
                        unreachable!("Unexpected result from key value operation")
                    }
                }
                .boxed();

                self.state = BuilderState::Executing(future);
                self.poll(cx)
            }
        }
    }
}