1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
//! The document here mainly for:
//! * Instruction on how to implement protocols on top of the framework. Check
//! [`protocol::unreplicated`] module for a beginner example.
//! * Instruction on how to evaluate with this codebase. Check provided binaries
//! for reference.
//! * Record some explanation of design choice, help me think consistently over
//! long develop period.
//!
//! The detail implementation of various protocols and applications are mostly
//! undocumented, refer to original work for them.
//!
//! # Stability
//!
//! As the time of writing this, we are around release candidate of 1.0 version,
//! and I have tried out most alternative architecture and components, and I
//! believe that most thing remain here comes with a reason.
//!
//! As a result, hopefully there will be no major breaking update on the
//! codebase, i.e. everything in [`facade`] module remains the same forever. The
//! future work should be:
//! * Add more protocols and applications implementation and evaluate them.
//! * Add more framework components, e.g. kernel network stack, if necessary.
//! * Bump toolchain and dependencies version.
/// Interfaces across top-level modules, and to outside.
///
/// The general architecture follows [specpaxos], with following mapping:
/// * `Transport`: [`Transport`](facade::Transport) and
/// [`TxAgent`](facade::TxAgent)
/// * `TransportReceiver`: [`Receiver`](facade::Receiver) and `rx_agent` closure
/// * `Configuration`: [`Config`](facade::Config)
/// * `TransportAddress`: [`Transport::Address`](facade::Transport::Address)
/// * `AppReplica`: [`App`](facade::App)
/// * `Client`: [`Invoke`](facade::Invoke)
///
/// (There is nothing corresponding to `Replica` right now, replica receivers
/// interact with applications directly.)
///
/// [specpaxos]: https://github.com/UWSysLab/specpaxos
///
/// There is some modification to allow us work with Rust's borrow and lifetime
/// system, but all implementations' code should be able to be organized in the
/// same way as specpaxos.
///
/// Additionally, [`AsyncEcosystem`](facade::AsyncEcosystem) trait allow
/// receiver to work in asynchronized way, which is probably required by all
/// `Invoke`able receivers. The multithreading counterpart [`stage`] is designed
/// as a fixed-implementation module, and stay outside of the facade.
/// Low-level DPDK binding.
///
/// For practical usage consider [`framework::dpdk`], which is a higher level
/// DPDK interface based on this module.
///
/// This shim module contains two parts: a set of `extern "C"` DPDK function
/// definitions, and a custom L2 packet layout.
///
/// The extern functions starts with `rte_` present in DPDK's shared objects,
/// and will be linked directly. The other functions which starts with `oskr_`
/// are static inline DPDK functions that defined in C header files, so this
/// codebase create a custom stub in `dpdk_shim.c` to be linked against.
///
/// There is also a `setup_port` function, which underly calls several DPDK
/// functions to set up a ethernet device port properly.
///
/// # DPDK transport packet format
///
/// By default DPDK does not perform TCP/IP network stack processing, so if we
/// don't include these protocol layers, the packet parsing/assembling part can
/// be omitted, and the transport packets are smaller.
///
/// The DPDK transport packet layout takes 17 bytes. The first 14 bytes are
/// normal ethernet layer: 6 bytes destination MAC address, 6 bytes source MAC
/// address, and 2 bytes ethernet type is specified as `0x88d5`. Then there are
/// 1 byte destination local id and 1 byte source local id. Finally, there is 1
/// byte as id extension field. Local id pairs are encoded as following:
/// * The local id field contains the low 7 bits.
/// * If the highest bit of local id field is 1, then the id is a 15 bits
/// "large" id, and the extension field contains high 8 bits. Otherwise, it is
/// a 7 bits "small" id.
/// * At most one local id of the two is large. If both of them are small,
/// extension field is unused.
///
/// The transport packet format can be transfered with normal L2 forwarding, and
/// support up to 128 local ids on small side, and up to 32768 local ids on
/// large side. This allow users to run large number of clients from the same
/// machine. Notice that the format actually allows more than 128 servers, i.e.
/// well-known addresses. Just use difference MAC addresses. The format is also
/// capatible with L2 multicast.
///
/// The related accessing and manipulating functions of the packet format is
/// defined in [`rte_mbuf`](dpdk_shim::rte_mbuf).
/// Simulated facilities for writing test cases.
/// Stage abstraction. Receiver on stage can use multiple threads efficiently.
///
/// Strictly speaking this is part of [`framework`]. Keeping in its own module
/// to emphasize its importance.
///
/// # Why create another wheel?
///
/// There are good enough thread pool libraries from Rust community. [Rayon] for
/// example is one of the most popular, and it is able to be used in this
/// codebase. Alternatively, the async-based libraries are also good for
/// multithreading in some ways.
///
/// [Rayon]: https://docs.rs/rayon/latest/rayon/
///
/// Building a new stage is not for extreme performance, and there is no plan to
/// perform special optimization from the first place. The requirement is more
/// about a specialized interface, which:
/// * Makes a distinguishment between *stateful* tasks and *stateless* tasks.
/// Most protocol implementations are centralized to "one big state", and
/// tranditionally they runs in a single-threaded context.
///
/// The meaning of stage is to annotate stateless snippets, give runtime a
/// chance to go concurrent and dramatically increase performance. This is
/// conceptually different to existed worker pool models, which assume most
/// tasks are independent and treat stateful tasks as exception.
///
/// In practice, this stage module puts minimal affection on implementation's
/// logical structure. No lock or channel is required.
/// * Provides task priority. Certain protocols may rely on dynamical priority
/// semantic, i.e. determine task-processing order upon submitting, which
/// cannot be expressed as simple FIFO or LIFO strategy.
/// * Provides timing task. This interface is absent in rayon, and although it
/// presents in async libraries, it may not guarantee efficient implementation
/// of reseting deadline, which is widely required by view change related
/// timers.
///
/// Additionally, providing a standard stage for server-side receiver serves as
/// a supplement to the *lightweight RX agent* rule of
/// [`Transport`](facade::Transport). It is recommended for all server-side
/// receiver to:
/// * Be on stage.
/// * Submit as soon as possible in RX agent.
///
/// # Why conditional compiling?
///
/// In this codebase we try to avoid conditional compiling in most places.
/// Instead, we abstract aspects of runtime into traits in [`facade`]. However,
/// the stage module is using conditional compiling: in test profile the stage
/// module is mocked by an asynchronized polyfill from [`simulated`] module.
///
/// The reason for this design is mostly because I believe there will be only
/// one "suitable" stage implementation for all scenario except testing.
/// Opposite to this, transport may be DPDK-based or kernel-based, and async
/// ecosystem that based on Tokio or smol may not be suitable for benchmark.
/// Making abstraction is fun, but more abstraction means more learning efforts
/// users need to spend, so skipping should be better.
///
/// **The shortage of conditional compiling.** The polyfill simulated
/// implementation only promises to work when pairing with async ecosystem from
/// [`framework::tokio`]. Because of conditional compiling there is no way to
/// force this pairing (the generic specialization of Rust is still on its way).
///
/// # Why callback hell?
///
/// The [`Submit`](stage::Submit), core interface of stage module, cause
/// submitted task one level deeper in closure. There are lots of efforts on
/// avoid such callback hell, such as promise and async syntax. These solutions
/// normally only works well on sequential logic. However, implementations
/// usually do conditionally submitting, or even submit in a loop, for example
/// sending replies while executing a batch. As far as I know, callback closure
/// is still the most nature way to express in such case.
///
/// # Example: sample sort
///
/// ```
/// # use std::{sync::{Arc, atomic::{AtomicBool, Ordering}}, iter::once};
/// # use oskr::stage::{Handle, Submit, State};
/// # use rand::{Rng, thread_rng, distributions::Uniform, seq::SliceRandom};
/// // sort list of u32 and merge it into state (only work for once)
/// // invariant: state vec is always sorted
/// struct SortingState(Vec<u32>);
///
/// impl State for SortingState {
/// type Shared = (); // nothing need to be shared here
/// fn shared(&self) -> Self::Shared {}
/// }
///
/// // these are free functions because document code example locate outside
/// // crate. normally they could be `StatelessContext<SortingState>`'s methods
/// fn sort(submit: &Submit<SortingState>, list: Vec<u32>, p: usize, completed: Arc<AtomicBool>) {
/// let n = list.len();
/// submit.stateless(move |shared| sort_internal(&shared.submit, list, p, n, completed));
/// }
///
/// fn sort_internal(
/// submit: &Submit<SortingState>,
/// mut list: Vec<u32>,
/// p: usize,
/// n: usize,
/// completed: Arc<AtomicBool>,
/// ) {
/// if list.is_empty() {
/// return;
/// }
/// // hardcoded small sort threshold for simplicity
/// if list.len() <= 32 {
/// list.sort_unstable();
/// submit.stateful(move |state| {
/// let position = state
/// .0
/// .binary_search(list.last().unwrap())
/// .unwrap_or_else(|p| p);
/// state.0.splice(position..position, list);
/// if state.0.len() == n {
/// completed.store(true, Ordering::SeqCst);
/// }
/// });
/// return;
/// }
///
/// let mut sample_list: Vec<_> = list
/// .choose_multiple(&mut thread_rng(), p - 1)
/// .cloned()
/// .collect();
/// sample_list.sort_unstable();
/// sample_list.push(u32::MAX);
/// for (low, high) in once(&u32::MIN).chain(&sample_list).zip(&sample_list) {
/// let list = list
/// .iter()
/// .filter(|n| (low..high).contains(n))
/// .cloned()
/// .collect();
/// let completed = completed.clone();
/// submit.stateless(move |shared| sort_internal(&shared.submit, list, p, n, completed));
/// }
/// }
///
/// fn main() {
/// let list: Vec<_> = thread_rng()
/// .sample_iter(Uniform::new(u32::MIN, u32::MAX))
/// .take(1000)
/// .collect();
/// let mut expected = list.clone();
/// expected.sort_unstable();
/// let state = Handle::from(SortingState(Vec::new()));
/// let completed = Arc::new(AtomicBool::new(false));
/// // can be either `with_stateless` or `with_stateful`, we only need to
/// // access injected `submit` property
/// state.with_stateless({
/// let completed = completed.clone();
/// move |shared| sort(&shared.submit, list, 2, completed)
/// });
/// // donate current thread to stage for executing tasks
/// // you can donate more thread to speed it up
/// state.run_worker(move || completed.load(Ordering::SeqCst));
/// state.with_stateful(|state| assert_eq!(state.0, expected));
/// }
/// ```
// for production
/// Common definitions. Extract them so future refactor can be easier.
///
/// # Difference between `common` and `framework`
///
/// The common module is more specification-like, while [`framework`] module
/// contains more implementation. For example, we keep
/// [`ReplicaId`](common::ReplicaId) alias in common module, so if one day we
/// decide to change its aliasing target (again actually, originally it was
/// `u8`), we only need to change it once in common module instead of finding
/// every occurance in the codebase. Framework module does not serve this
/// propose.
///
/// In practice, common module can be depended by anything, even from [`facade`]
/// and [`protocol`], while framework should not be depended by either of them.
/// Protocol implementations.
///
/// This module exports implementations of [`Receiver`](facade::Receiver) and
/// [`Invoke`](facade::Invoke).
///
/// # Implementation convension
///
/// `Receiver`s provide a `register_new(config, &mut transport, ...)` function,
/// which constructs a receiver instance and register it to `transport`. This
/// mimics the behavior of specpaxos `TransportReceiver` constructor. However,
/// the conventional argument order is different: besides `config` and
/// `transport`, for server side receiver the third argument is usually
/// `replica_id`, and the forth is usually `app`. These arguments are commonly
/// required by almost all receivers.
///
/// The `Invoke`able receivers should not depend on specific asynchronous
/// facility. Instead, they should access to asynchronous functionality through
/// [`AsyncEcosystem`](facade::AsyncEcosystem) trait.
///
/// The non-`Invoke`able receivers, i.e. server nodes should be built upon
/// [`stage`], even for the single-threaded ones.
/// Application implementations.
///
/// This module exports implementation of [`App`](facade::App). The simplest
/// application may be "client-free". They don't require complicated message
/// encoding or client-side behavior. One example of these applications is
/// timestamp server.
///
/// For other applications, e.g. client-coordinated transactional store, they
/// provide customized client along with the `App`. These clients has various
/// interfaces, but they probably leverage `Invoke`able, i.e. protocol client
/// to proceed communication.
/// Engineering components.
///
/// The propose here is to keep protocol implementations minimal. Decouple
/// protocol implementations with reality helps improve protocol module's
/// readibility. (As this implied, code in framework is generally harder to
/// follow.)
///
/// The framework components expose all kinds of interfaces. The essential ones
/// are [`Transport`](facade::Transport) and
/// [`AsyncEcosystem`](facade::AsyncEcosystem) implementations. The submodules
/// are named after invovled lower-level stuff, and the structure is flattened.