1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
use futures::{channel::oneshot::Receiver, future::Shared};
use super::{change::ChangeValNoWake, *};
use crate::error::Closed;
use futures::FutureExt;
use std::{
mem,
ops::DerefMut,
sync::{Arc, Weak},
};
pub struct EventualWriter<T>
where
T: Value,
{
state: Weak<SharedState<T>>,
closed: Shared<Receiver<()>>,
}
impl<T> Drop for EventualWriter<T>
where
T: Value,
{
fn drop(&mut self) {
let _ignore = self.write_private(Err(Closed));
}
}
impl<T> EventualWriter<T>
where
T: Value,
{
pub(crate) fn new(state: &Arc<SharedState<T>>, closed: Receiver<()>) -> Self {
Self {
state: Arc::downgrade(state),
closed: closed.shared(),
}
}
pub fn closed(&self) -> impl 'static + Future + Send + Unpin {
self.closed.clone()
}
pub fn write(&mut self, value: T) {
self.write_private(Ok(value))
}
fn write_private(&mut self, value: Result<T, Closed>) {
if let Some(state) = self.state.upgrade() {
{
let mut prev = state.last_write.lock().unwrap();
if let Ok(value) = value {
*prev = ChangeValNoWake::Value(value);
} else {
match mem::replace(prev.deref_mut(), ChangeValNoWake::None) {
ChangeValNoWake::None => {
*prev = ChangeValNoWake::Finalized(None);
}
ChangeValNoWake::Value(value) => {
*prev = ChangeValNoWake::Finalized(Some(value));
}
ChangeValNoWake::Finalized(_) => unreachable!(),
}
}
}
state.notify_all();
}
}
}