1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
use std::{
mem,
pin::Pin,
task::{self, Poll},
};
use futures::Stream as FutStream;
use parking_lot::Mutex;
use crate::{
addr::Addr,
context::Source,
envelope::{Envelope, MessageKind},
message::Message,
trace_id::{self, TraceId},
};
pub struct Stream<S>(Mutex<StreamState<S>>);
enum StreamState<S> {
Active(Pin<Box<S>>),
Closed,
}
impl<S> Stream<S> {
pub fn new(stream: S) -> Self {
Stream(Mutex::new(StreamState::Active(Box::pin(stream))))
}
pub fn set(&self, stream: S) {
*self.0.lock() = StreamState::Active(Box::pin(stream));
}
pub fn replace(&self, stream: S) -> Option<S>
where
S: Unpin,
{
let new_state = StreamState::Active(Box::pin(stream));
match mem::replace(&mut *self.0.lock(), new_state) {
StreamState::Active(stream) => Some(*Pin::into_inner(stream)),
StreamState::Closed => None,
}
}
pub fn close(&self) -> bool {
!matches!(
mem::replace(&mut *self.0.lock(), StreamState::Closed),
StreamState::Closed
)
}
}
impl<S> Source for Stream<S>
where
S: FutStream,
S::Item: TracedItem,
{
fn poll_recv(&self, cx: &mut task::Context<'_>) -> Poll<Option<Envelope>> {
let mut state = self.0.lock();
let stream = match &mut *state {
StreamState::Active(stream) => stream,
StreamState::Closed => return Poll::Pending,
};
match stream.as_mut().poll_next(cx) {
Poll::Ready(Some(message)) => {
let (trace_id, message) = message.unify();
let kind = MessageKind::Regular { sender: Addr::NULL };
let envelope = Envelope::with_trace_id(message, kind, trace_id).upcast();
Poll::Ready(Some(envelope))
}
Poll::Ready(None) => {
*state = StreamState::Closed;
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
}
}
}
pub trait TracedItem {
type Message: Message;
fn unify(self) -> (TraceId, Self::Message);
}
impl<M: Message> TracedItem for M {
type Message = M;
#[inline]
fn unify(self) -> (TraceId, M) {
(trace_id::generate(), self)
}
}
impl<M: Message> TracedItem for (TraceId, M) {
type Message = M;
#[inline]
fn unify(self) -> (TraceId, M) {
self
}
}