1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use futures::Stream;
use crate::{
server::{abortable::AbortableState, rpc_submitter::RpcResponse},
Message,
};
pub struct ForwardAbortableStreamingRpc<S, T>
where
S: Stream<Item = AbortableState<crate::Result<T>>>,
T: Message,
{
stream: S,
id: u64,
forward: spillway::Sender<RpcResponse<T>>,
completed_for_drop: bool,
}
impl<S, T> Drop for ForwardAbortableStreamingRpc<S, T>
where
S: Stream<Item = AbortableState<crate::Result<T>>>,
T: Message,
{
fn drop(&mut self) {
if !self.completed_for_drop {
log::debug!("dropping unary rpc before completion: {}", self.id);
let _ = self.forward.send(RpcResponse::Final(T::cancelled(self.id)));
}
}
}
impl<S, T> ForwardAbortableStreamingRpc<S, T>
where
S: Stream<Item = AbortableState<crate::Result<T>>>,
T: Message,
{
pub fn new(stream: S, id: u64, forward: spillway::Sender<RpcResponse<T>>) -> Self {
Self {
stream,
id,
forward,
completed_for_drop: false,
}
}
fn complete_for_drop(self: Pin<&mut Self>) {
// SAFETY: This is a structural pin. If I'm not moved then neither is this boolean (it was an invariant for the future anyway).
unsafe {
self.get_unchecked_mut().completed_for_drop = true;
}
}
}
impl<S, T> Future for ForwardAbortableStreamingRpc<S, T>
where
S: Stream<Item = AbortableState<crate::Result<T>>>,
T: Message,
{
type Output = ();
fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
loop {
// SAFETY: This is a structural pin. If I'm not moved then neither is this future.
break match unsafe { self.as_mut().map_unchecked_mut(|me| &mut me.stream) }
.poll_next(context)
{
Poll::Ready(state) => {
match state {
Some(AbortableState::Ready(Ok(response))) => {
log::trace!("{} unary rpc response", self.id);
if let Err(_e) = self.forward.send(RpcResponse::Partial(response)) {
log::debug!("outbound connection is closed");
}
continue;
}
Some(AbortableState::Ready(Err(e))) => {
match e {
crate::Error::IoFailure(error) => {
log::warn!(
"{} io failure while servicing rpc: {error:?}",
self.id
);
let _ = self
.forward
.send(RpcResponse::Final(T::cancelled(self.id)));
}
crate::Error::CancelledRemotely => {
log::debug!("{} rpc cancelled remotely", self.id);
let _ = self
.forward
.send(RpcResponse::Final(T::cancelled(self.id)));
}
crate::Error::ConnectionIsClosed => {
log::debug!("{} rpc cancelled remotely", self.id);
let _ = self
.forward
.send(RpcResponse::Final(T::cancelled(self.id)));
}
crate::Error::Finished => {
log::debug!("{} unary rpc ended", self.id);
if let Err(_e) =
self.forward.send(RpcResponse::Final(T::ended(self.id)))
{
log::debug!("outbound connection is closed");
}
}
}
self.complete_for_drop();
Poll::Ready(())
}
Some(AbortableState::Abort) => {
// This happens when the upstream stuff is dropped and there are no messages that can be produced. We'll send a cancellation.
log::debug!("{} unary rpc abort", self.id);
let _ = self.forward.send(RpcResponse::Final(T::cancelled(self.id)));
self.complete_for_drop();
Poll::Ready(())
}
Some(AbortableState::Aborted) => {
log::debug!("{} unary rpc was cancelled by the client.", self.id);
let _ = self.forward.send(RpcResponse::Final(T::cancelled(self.id)));
self.complete_for_drop();
Poll::Ready(())
}
None => {
log::debug!("{} streaming rpc reached the end", self.id);
let _ = self.forward.send(RpcResponse::Final(T::ended(self.id)));
self.complete_for_drop();
Poll::Ready(())
}
}
}
Poll::Pending => Poll::Pending,
};
}
}
}