1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
use protosocket::MessageReactor;
use crate::{
server::{abortion_tracker::AbortionTracker, rpc_responder::RpcResponder, ConnectionService},
Message, ProtosocketControlCode,
};
/// A MessageReactor that sends RPCs along to a sink
#[derive(Debug)]
pub struct RpcSubmitter<TConnectionServer>
where
TConnectionServer: ConnectionService,
{
connection_server: TConnectionServer,
outbound: spillway::Sender<RpcResponse<<TConnectionServer as ConnectionService>::Response>>,
aborts: AbortionTracker,
}
impl<TConnectionService> RpcSubmitter<TConnectionService>
where
TConnectionService: ConnectionService,
{
pub fn new(
connection_server: TConnectionService,
outbound: spillway::Sender<RpcResponse<TConnectionService::Response>>,
) -> Self {
Self {
connection_server,
outbound,
aborts: Default::default(),
}
}
}
pub enum RpcResponse<T> {
Partial(T),
Final(T),
Untracked(T),
}
impl<TConnectionService> MessageReactor for RpcSubmitter<TConnectionService>
where
TConnectionService: ConnectionService,
{
type Inbound = TConnectionService::Request;
type Outbound = TConnectionService::Response;
type LogicalOutbound = RpcResponse<TConnectionService::Response>;
fn on_inbound_message(&mut self, message: Self::Inbound) -> protosocket::ReactorStatus {
let message_id = message.message_id();
match message.control_code() {
ProtosocketControlCode::Normal => {
self.connection_server.new_rpc(
message,
RpcResponder::new_responder_reference(
&self.outbound,
&mut self.aborts,
message_id,
),
);
}
ProtosocketControlCode::Cancel => {
if let Some(abort) = self.aborts.take_abort(message_id) {
log::debug!("cancelling message {message_id}");
abort.mark_aborted();
} else {
log::debug!("received cancellation for untracked message {message_id}");
}
}
ProtosocketControlCode::End => {
log::debug!("received end message {message_id}");
}
}
protosocket::ReactorStatus::Continue
}
fn on_outbound_message(&mut self, response: Self::LogicalOutbound) -> Self::Outbound {
match response {
RpcResponse::Partial(message) => message,
RpcResponse::Untracked(message) => message,
RpcResponse::Final(message) => {
if self.aborts.take_abort(message.message_id()).is_none() {
log::debug!(
"final response for untracked message {}",
message.message_id()
);
}
message
}
}
}
fn poll(
mut self: std::pin::Pin<&mut Self>,
context: &mut std::task::Context<'_>,
) -> std::ops::ControlFlow<()> {
// SAFETY: This is a structural pin. If I'm not moved then neither is this future.
let structurally_pinned_connection_server = unsafe {
self.as_mut()
.map_unchecked_mut(|me| &mut me.connection_server)
};
structurally_pinned_connection_server.poll(context)
}
}
impl<TConnectionService> RpcSubmitter<TConnectionService>
where
TConnectionService: ConnectionService,
{
// fn poll_advance_streaming_rpcs(
// mut self: Pin<&mut Self>,
// context: &mut Context<'_>,
// ) -> Option<Poll<Result<(), crate::Error>>> {
// if self.outstanding_streaming_rpcs.is_empty() {
// log::trace!("no outstanding streaming rpcs to advance");
// return None;
// }
// while let Poll::Ready(streaming_next) =
// futures::Stream::poll_next(pin!(&mut self.outstanding_streaming_rpcs), context)
// {
// match streaming_next {
// Some((id, AbortableState::Ready(Ok(next)))) => {
// log::debug!("{id} streaming rpc next {next:?}");
// if let Err(_e) = self.outbound.send(next) {
// log::debug!("outbound connection is closed");
// return Some(Poll::Ready(Err(crate::Error::ConnectionIsClosed)));
// }
// }
// Some((id, AbortableState::Ready(Err(e)))) => {
// let abort = self.aborts.remove(&id);
// match e {
// crate::Error::IoFailure(error) => {
// log::warn!("{id} io failure while servicing rpc: {error:?}");
// if let Some(abort) = abort {
// abort.abort();
// }
// }
// crate::Error::CancelledRemotely => {
// log::debug!("{id} rpc cancelled remotely");
// if let Some(abort) = abort {
// abort.abort();
// }
// }
// crate::Error::ConnectionIsClosed => {
// log::debug!("{id} rpc cancelled remotely");
// if let Some(abort) = abort {
// abort.abort();
// }
// }
// crate::Error::Finished => {
// log::debug!("{id} streaming rpc ended");
// if let Some(abort) = abort {
// if let Err(_e) = self
// .outbound
// .send(<TConnectionService::Response as Message>::ended(id))
// {
// log::debug!("outbound connection is closed");
// return Some(Poll::Ready(Err(
// crate::Error::ConnectionIsClosed,
// )));
// }
// abort.mark_aborted();
// }
// }
// }
// }
// Some((id, AbortableState::Abort)) => {
// // This happens when the upstream stuff is dropped and there are no messages that can be produced. We'll send a cancellation.
// log::debug!("{id} streaming rpc abort");
// if let Some(abort) = self.aborts.remove(&id) {
// abort.abort();
// }
// }
// Some((id, AbortableState::Aborted)) => {
// log::debug!("{id} streaming rpc done");
// if let Some(abort) = self.aborts.remove(&id) {
// abort.mark_aborted();
// }
// }
// None => {
// // nothing to wait for
// break;
// }
// }
// }
// None
// }
}