1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
mod checkpoint_generator;
mod request_batcher;
use std::{
cmp::Ordering,
collections::{HashMap, VecDeque},
num::NonZeroUsize,
time::{Duration, Instant},
};
use derivative::Derivative;
use serde::{Deserialize, Serialize};
use shared_ids::{ClientId, RequestId};
use tracing::{debug, trace};
use usig::Usig;
use crate::{
client_request::ClientRequest,
output::{NotReflectedOutput, TimeoutRequest},
peer_message::usig_message::{
checkpoint::CheckpointContent,
view_peer_message::prepare::{Prepare, PrepareContent},
},
Config, RequestPayload, ViewState,
};
use self::{checkpoint_generator::CheckpointGenerator, request_batcher::RequestBatcher};
/// Defines the state of a Client.
/// Contains the ID of the last accepted client request and the currently
/// processing client request with its ID.
#[derive(Debug, Serialize, Deserialize, Derivative)]
#[derivative(Default(bound = ""))]
struct ClientState<P> {
/// The last accepted client request.
last_accepted_req: Option<RequestId>,
/// The currently processing, not yet accepted client request with its ID.
currently_processing_req: Option<(RequestId, ClientRequest<P>)>,
}
impl<P> ClientState<P> {
/// Update [ClientState] when receiving a new client request.
///
/// # Arguments
///
/// * `request_id` - The ID of the request received.
/// * `client_req` - The client request received.
///
/// # Return Value
///
/// Returns true if the [RequestId] is greater than the currently processing
/// request or if there is no currently processing request, otherwise false.
///
///
fn update_upon_request_receival(
&mut self,
request_id: RequestId,
client_req: ClientRequest<P>,
) -> bool {
if Some(request_id) <= self.last_accepted_req {
trace!("Ignored request to update client state with an old client request with ID {:?} from client with ID {:?}: last accepted request of the same client had ID {:?}.", request_id, client_req.client, self.last_accepted_req);
return false;
}
if let Some(processing) = &self.currently_processing_req {
match request_id.cmp(&processing.0) {
Ordering::Less => {
trace!("Ignored request to update client state with an old client request with ID {:?} from client with ID {:?}: currently processing request of the same client is newer, has ID {:?}.", request_id, client_req.client, processing.0);
false
}
Ordering::Equal => {
// It was seen before.
trace!("Ignored request to update client state with client request with ID {:?} from client with ID {:?} which was already previously received and is being processed.", request_id, client_req.client);
false
}
Ordering::Greater => {
trace!("Updated client state with client request with ID {:?} from client with ID {:?}: received request is newer than the currently processing one with ID {:?} of the same client.", request_id, client_req.client, processing.0);
self.currently_processing_req = Some((request_id, client_req));
true
}
}
} else {
trace!(
"Updated client state with client request with ID {:?} from client with ID {:?}: no request was currently being processed of the same client.",
request_id, client_req.client
);
self.currently_processing_req = Some((request_id, client_req));
true
}
}
/// Update [ClientState] when completing a client request.
///
/// Ensures the ID of the request completed is higher than the ID of the
/// last accepted request.
/// Sets the currently processing request to [None] if its ID is lower
/// or equal to the completed request.
///
/// # Arguments
///
/// * `request_id` - The ID of the request that was completed.
///
/// # Return Value
///
/// Returns false if the client request was already accepted, otherwise
/// true.
fn update_upon_request_completion(&mut self, request_id: RequestId) -> bool {
if self.last_accepted_req >= Some(request_id) {
trace!("Failed to update client state regarding the completion of client request (ID: {:?}): ID of last accepted request from the same client is greater than or equal to the receiving request's ID.", request_id);
return false;
}
self.last_accepted_req = Some(request_id);
if let Some(currently_processing) = &self.currently_processing_req {
if currently_processing.0 <= request_id {
self.currently_processing_req = None;
}
}
true
}
}
/// The purpose of the struct is to process and accept requests.
#[derive(Debug)]
pub(crate) struct RequestProcessor<P: RequestPayload, U: Usig> {
/// Collects the ClientState of each ClientId.
clients_state: HashMap<ClientId, ClientState<P>>,
/// Collects the currently processing client requests.
/// Additionally to the ID of the request and the client request itself,
/// the time of arrival of the request is kept track of.
currently_processing_reqs: VecDeque<(RequestId, ClientRequest<P>, Instant)>,
/// Used for batching requests.
pub(crate) request_batcher: RequestBatcher<P>,
/// Used for possibly generating a Checkpoint when sufficient requests have
/// been accepted.
pub(crate) checkpoint_generator: CheckpointGenerator<P, U>,
round: u64,
}
impl<P: RequestPayload, U: Usig> RequestProcessor<P, U>
where
U::Attestation: Clone,
U::Signature: Clone + Serialize,
{
/// Create a new RequestProcessor with the given
/// batch timeout and the max size for a batch.
pub(crate) fn new(
batch_timeout_duration: Duration,
batch_max_size: Option<NonZeroUsize>,
) -> Self {
RequestProcessor {
clients_state: HashMap::new(),
currently_processing_reqs: VecDeque::new(),
request_batcher: RequestBatcher::new(batch_timeout_duration, batch_max_size),
checkpoint_generator: CheckpointGenerator::new(),
round: 0,
}
}
/// Processes a client request.
///
/// May return a client timeout, a [Prepare], and/or a batch timeout.
///
/// # Arguments
///
/// * `client_request` - The client request to be processed.
/// * `view_state` - The inner state of the replica.
/// * `client_timeout_duration` - The duration of the client timeout.
/// * `config` - The configuration of the replica.
///
/// # Return Value
///
/// A client timeout is returned when there was no client timeout running
/// upon receiving the given client request.
///
/// A [Prepare] is returned when the maximum batch size of client requests
/// has been reached.
///
/// A batch timeout is returned when there was no batch timeout running upon
/// receiving the given client request and the maximum batch size of client
/// requests has not (yet) been reached.
///
/// A tuple is returned in the aforementioned order with the respective
/// values.
pub(crate) fn process_client_req(
&mut self,
client_request: ClientRequest<P>,
view_state: &ViewState<P, U::Signature>,
client_timeout_duration: Duration,
config: &Config,
) -> (
Option<TimeoutRequest>,
Option<PrepareContent<P>>,
Option<TimeoutRequest>,
) {
trace!(
"Processing client request (ID {:?}, client ID: {:?}) ...",
client_request.id(),
client_request.client
);
let request_id = client_request.id();
trace!(
"Updating state of client (ID: {:?}) ...",
client_request.client
);
if !self
.clients_state
.entry(client_request.client)
.or_default()
.update_upon_request_receival(request_id, client_request.clone())
{
return (None, None, None);
}
self.currently_processing_reqs.push_back((
request_id,
client_request.clone(),
Instant::now(),
));
let start_client_timeout = Some(TimeoutRequest::new_start_client_req(
client_request.client,
client_timeout_duration,
));
let mut prepare_content = None;
let mut batch_timeout_request = None;
match view_state {
ViewState::InView(in_view) => {
if config.me_primary(in_view.view) {
let request_batch;
let timeout_request;
(request_batch, timeout_request) =
self.request_batcher.batch(client_request.clone());
batch_timeout_request = Some(timeout_request);
prepare_content = None;
if let Some(batch) = request_batch {
let origin = config.me();
prepare_content = Some(PrepareContent {
view: in_view.view,
origin,
request_batch: batch,
});
}
};
}
// Client messages are ignored when the replica is in the state of
// changing Views.
ViewState::ChangeInProgress(in_progress) => {
trace!("Ignored possible (if replica is primary) creation of Prepare as replica is in the process of changing views (from: {:?}, to: {:?}).", in_progress.prev_view, in_progress.next_view);
}
}
trace!(
"Processed client request (ID: {:?}, client ID: {:?}).",
client_request.id(),
client_request.client
);
(start_client_timeout, prepare_content, batch_timeout_request)
}
/// Returns the currently processing client request with its RequestId
/// of each Client being tracked.
pub(super) fn currently_processing_all(
&self,
) -> impl Iterator<Item = (RequestId, &ClientRequest<P>)> {
self.clients_state
.values()
.filter_map(|req| req.currently_processing_req.as_ref())
.map(|(id, req)| (*id, req))
}
/// Accepts the provided [Prepare].
///
/// # Arguments
///
/// * `config` - The configuration of the replica.
/// * `prepare` - The accepted [Prepare].
/// * `timeout_duration` - The current set timeout duration.
/// * `output` - The output struct to be adjusted in case of, e.g., errors
/// or responses.
///
/// # Return Value
///
/// The CheckpointContent if a Checkpoint is generated (see
/// [CheckpointGenerator]).
pub(crate) fn accept_prepare(
&mut self,
config: &Config,
prepare: Prepare<P, U::Signature>,
timeout_duration: Duration,
output: &mut NotReflectedOutput<P, U>,
) -> Option<CheckpointContent> {
self.round += 1;
debug!("Accepting batch of Prepares ...");
for request in prepare.request_batch.clone() {
self.accept_request(request, timeout_duration, output);
}
trace!("Accepted batch of Prepares.");
self.checkpoint_generator
.generate_checkpoint(&prepare, config)
}
/// Returns the round, i.e. the amount of accepted [Prepare]s.
pub(crate) fn round(&self) -> u64 {
self.round
}
/// Accepts the provided request and responds to the respective client.
/// Sets a new timeout if there are still currently processing requests.
/// The elapsed time after the initial arrival of the client request is
/// considered when computing the timeout duration.
///
/// # Arguments
///
/// * `request` - The accepted client request.
/// * `curr_full_timeout_duration` - The current set timeout duration.
/// * `output` - The output struct to be adjusted in case of, e.g., errors
/// or responses.
fn accept_request(
&mut self,
request: ClientRequest<P>,
curr_full_timeout_duration: Duration,
output: &mut NotReflectedOutput<P, U>,
) {
trace!(
"Accepting client request (ID: {:?}, client ID: {:?}) ...",
request,
request.client
);
// Update state of the client from which the request is.
if !self
.clients_state
.entry(request.client)
.or_default()
.update_upon_request_completion(request.payload.id())
{
return;
};
// Send request to stop timeout that may be set for this now accepted client-request.
let stop_client_request = TimeoutRequest::new_stop_client_req(request.client);
output.timeout_request(stop_client_request);
// Update data structure of currently processing requests.
// Possibly create a new timeout request with the reset duration.
let mut start_client_timeout = None;
while let Some(oldest_req) = self.currently_processing_reqs.front() {
// Check if the oldest request in the data structure was already accepted.
// Remove it if so.
let client_state = self.clients_state.get(&oldest_req.1.client);
if client_state.is_none() {
// It was already accepted.
self.currently_processing_reqs.pop_front();
continue;
};
let client_state = client_state.unwrap();
match Some(oldest_req.0).cmp(&client_state.last_accepted_req) {
Ordering::Greater => {
// The oldest request in the data structure has not yet been accepted.
// Send a request to start a timeout for it with the reset duration.
start_client_timeout = Some(TimeoutRequest::new_start_client_req(
oldest_req.1.client,
curr_full_timeout_duration,
));
break;
}
_ => {
// It was already accepted.
self.currently_processing_reqs.pop_front();
continue;
}
}
}
trace!(
"Accepted client request (ID: {:?}, client ID: {:?})",
request.id(),
request.client
);
output.response(request.client, request.payload);
if let Some(start_client_request) = start_client_timeout {
output.timeout_request(start_client_request);
}
}
}