1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
use std::ops::DerefMut;

use super::change::{ChangeReader, ChangeValNoWake};
use super::shared_state::SharedState;
use super::*;
use crate::IntoReader;
use futures::channel::oneshot;
use futures::never::Never;
use tokio::select;

/// The entry point for getting the latest snapshots of values
/// written by an EventualWriter. It supports multiple styles
/// of observation.
pub struct Eventual<T> {
    state: Arc<SharedState<T>>,
}

impl<T> Eventual<T>
where
    T: Value,
{
    /// Create a reader/writer pair.
    pub fn new() -> (EventualWriter<T>, Self) {
        let (sender, receiver) = oneshot::channel();
        let state = Arc::new(SharedState::new(sender));
        (EventualWriter::new(&state, receiver), Eventual { state })
    }

    /// Create an eventual having a final value. This is useful for creating
    /// "mock" eventuals to pass into consumers.
    pub fn from_value(value: T) -> Self {
        let (mut writer, eventual) = Eventual::new();
        writer.write(value);
        eventual
    }

    /// A helper for spawning a task which writes to an eventual.
    /// These are used extensively within the library for eventuals
    /// which update continuously over time.
    pub fn spawn<F, Fut>(f: F) -> Self
    where
        F: 'static + Send + FnOnce(EventualWriter<T>) -> Fut,
        Fut: Future<Output = Result<Never, Closed>> + Send,
    {
        let (writer, eventual) = Eventual::new();
        tokio::spawn(async move {
            select!(
                _ = writer.closed() => {}
                _ = async { f(writer).await }  => {}
            );
        });
        eventual
    }

    /// Subscribe to present and future snapshots of the value in this Eventual.
    /// Generally speaking the observations of snapshots take into account the
    /// state of the reader such that:
    ///   * The same observation is not made redundantly (same value twice in a
    ///     row)
    ///   * The observations always move forward in time
    ///   * The final observation will always be eventually consistent with the
    ///     final write.
    pub fn subscribe(&self) -> EventualReader<T> {
        EventualReader::new(self.state.clone())
    }

    /// Get a future that resolves with a snapshot of the present value of the
    /// Eventual, if any, or a future snapshot if none is available. Which
    /// snapshot is returned depends on when the Future is polled (as opposed
    /// to when the Future is created)
    pub fn value(&self) -> ValueFuture<T> {
        let change = self.state.clone().subscribe();
        ValueFuture {
            change: Some(change),
        }
    }

    /// Get a snapshot of the current value of this Eventual, if any,
    /// without waiting.
    pub fn value_immediate(&self) -> Option<T> {
        match self.state.last_write.lock().unwrap().deref_mut() {
            ChangeValNoWake::None => None,
            ChangeValNoWake::Value(t) => Some(t.clone()),
            ChangeValNoWake::Finalized(t) => t.clone(),
        }
    }

    #[cfg(feature = "trace")]
    pub fn subscriber_count(&self) -> usize {
        self.state.subscribers.lock().unwrap().len()
    }
}

pub struct ValueFuture<T> {
    change: Option<ChangeReader<T>>,
}

impl<T> Future for ValueFuture<T>
where
    T: Value,
{
    type Output = Result<T, Closed>;
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let update = self.change.as_mut().unwrap().change.poll(&None, cx);
        match update {
            None => Poll::Pending,
            Some(value) => {
                self.change = None;
                Poll::Ready(value)
            }
        }
    }
}

impl<T> Clone for Eventual<T> {
    #[inline]
    fn clone(&self) -> Self {
        Self {
            state: self.state.clone(),
        }
    }
}

impl<T> IntoReader for &'_ Eventual<T>
where
    T: Value,
{
    type Output = T;
    #[inline]
    fn into_reader(self) -> EventualReader<Self::Output> {
        self.subscribe()
    }
}

impl<T> IntoReader for Eventual<T>
where
    T: Value,
{
    type Output = T;
    #[inline]
    fn into_reader(self) -> EventualReader<Self::Output> {
        self.subscribe()
    }
}