1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
use super::*;
/// A builder that lets you prepare a connection before [`Self::finish`]ing it to attempt to
/// open the connection.
#[must_use = "A ConnectionBuilder must be `finish`ed to actually establish a connection"]
pub struct ConnectionBuilder<'a, OnReceiveFactory = (), OnReceiveOverload = ()> {
pub(crate) bus: &'a Bus,
pub(crate) peer: PeerPubkey,
pub(crate) topic: Option<bus::Topic>,
pub(crate) reliability: bus::Reliability,
pub(crate) on_receive: OnReceiveFactory,
pub(crate) marker: core::marker::PhantomData<OnReceiveOverload>,
}
impl<'a> ConnectionBuilder<'a> {
/// Allows setting [`StreamOpenOnce`] factory that will be used to handle the incoming
/// [`StreamCandidate`] once it is constructed.
///
/// You may still
/// finish<sup>[(0)](ConnectionBuilder::finish)[(1)](ConnectionBuilder::finish_with)[(2)](ConnectionBuilder::finish_async)</
/// sup> your [`ConnectionBuilder`] without setting such a factory. If you do so, your handler
/// will receive a [`StreamCandidate`] instead of the already open [`Stream`] you'd be provided
/// by if a factory has been set.
pub fn on_receive_factory<OnReceiveOverload, OnReceiveFactory>(
self,
on_recv_factory: OnReceiveFactory,
) -> ConnectionBuilder<'a, OnReceiveFactory, OnReceiveOverload> {
let Self {
bus,
peer,
topic,
reliability,
on_receive: _,
marker: _,
} = self;
ConnectionBuilder {
bus,
peer,
topic,
reliability,
on_receive: on_recv_factory,
marker: core::marker::PhantomData,
}
}
}
impl<'a, OnReceiveFactory: SingleCandidateHandler<OnReceiveOverload>, OnReceiveOverload>
ConnectionBuilder<'a, OnReceiveFactory, OnReceiveOverload>
{
/// Sets the reliability mode for the connection.
///
/// The default is [`ReliabilityMode::Reliable`].
pub fn reliability(mut self, mode: bus::Reliability) -> Self {
self.reliability = mode;
self
}
/// Attempts to open the connection, calling the `handler`'s continuation once opening has
/// either succeeded of failed.
///
/// # [`IntoChannel`]
/// The signature of this method is built to let you pass various handlers for the events
/// that will follow:
/// - You may pass closures directly.
/// - You may also pass channel tuples, such as `std::sync::mpsc::channel()`. The returned value
/// will wrap your channel's receiver end into the handle that keeps your binding alive.
pub fn finish<OverloadId, Handler>(self, handler: Handler) -> Handler::Receiver
where
Handler: IntoChannel<OverloadId, Result<OnReceiveFactory::Output, ConnectionError>>,
Handler::Sender:
OneShotChannel<OverloadId, Result<OnReceiveFactory::Output, ConnectionError>>,
{
let Self {
bus,
peer,
topic,
reliability,
on_receive: on_recv,
marker: _,
} = self;
let (channel, receiver) = handler.into_channel();
let Some(topic) = topic else {
channel.send_infallibly_once(Err(ConnectionError::ConnectionRejected));
return receiver;
};
let mut channel = Some((channel, on_recv));
let closure = Box::new(move |value: bus::ConnectionResult| {
let Some((channel, on_recv)) = channel.take() else {
unreachable!("Connections only resolve once.")
};
channel.send_infallibly_once(match value {
bus::ConnectionResult::Accepted(candidate) => {
Ok(on_recv.open_once(StreamCandidate { inner: candidate }))
}
bus::ConnectionResult::Rejected(err) => Err(err),
})
})
.into();
bus.bus
.inner
.connect(peer, (&*topic).into(), reliability, closure);
receiver
}
/// Attempts to open the connection, calling the `on_connect` once opening has
/// either succeeded of failed.
///
/// # Relation to [`ConnectionBuilder::finish`]
/// This method compensates for a weakness in Rust's type inference: when you pass a closure to
/// [`ConnectionBuilder::finish`], Rust will generally require that you explicitly type its
/// arguments, despite being able to compute the only valid type for it's argument (as
/// highlighted if you purposedly mis-type the argument).
///
/// Hence, it essentially exists to highlight this papercut and to let you _still_ not worry
/// about typing your closures.
pub fn finish_with<
F: FnOnce(Result<OnReceiveFactory::Output, ConnectionError>) + Send + Sync + 'static,
>(
self,
on_connect: F,
) {
self.finish(on_connect)
}
/// Returns a future that resolves once the connection has been established or determined to
/// fail.
pub fn finish_async(self) -> ConnectionFuture<OnReceiveFactory::Output>
where
OnReceiveFactory::Output: Send + 'static,
{
ConnectionFuture {
rx: self.finish(tokio::sync::oneshot::channel()).into_future(),
}
}
}
/// A future that resolves once a connection is established or has failed.
///
/// # Cancel-safety
/// This future is cancel-safe, however constructing in a loop would be ill-advised.
pub struct ConnectionFuture<T> {
rx: <tokio::sync::oneshot::Receiver<Result<T, ConnectionError>> as core::future::IntoFuture>::IntoFuture
}
impl<T> core::future::Future for ConnectionFuture<T> {
type Output = Result<T, ConnectionError>;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
match core::future::Future::poll(std::pin::Pin::new(&mut self.get_mut().rx), cx) {
std::task::Poll::Ready(r) => {
std::task::Poll::Ready(r.unwrap_or(Err(ConnectionError::PeerNotFound)))
}
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn connection_future() {
for value in [
Ok(()),
Err(ConnectionError::HandshakeFailed),
Err(ConnectionError::HandshakeTimedOut),
Err(ConnectionError::ConnectionRejected),
Err(ConnectionError::PeerNotFound),
] {
let (tx, rx) = tokio::sync::oneshot::channel();
_ = tx.send(value);
assert_eq!(value, ConnectionFuture { rx }.await)
}
let (_, rx) = tokio::sync::oneshot::channel();
assert_eq!(
Err::<(), _>(ConnectionError::PeerNotFound),
ConnectionFuture { rx }.await
);
}
}