1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
use super::*;
use bytes::{buf::UninitSlice, Bytes};
use std::{cmp::Ordering, ptr::NonNull};
/// A ChunkLease is a smart-pointer to a byte-slice, implementing [Buf](bytes::Buf) and
/// [BufMut](bytes::BufMut) interfaces. They are created with one or many distinct slices of
/// [BufferChunks](BufferChunk) and holds a lock preventing the data in the byte-slice(s) from being
/// modified until the `ChunkLease` is dropped.
#[derive(Debug)]
pub struct ChunkLease {
content: &'static mut [u8],
write_pointer: usize,
read_pointer: usize,
chain_head_len: usize,
lock: Arc<()>,
chain: Option<Box<ChunkLease>>,
/// The length of the chain from self (i.e. independent of parent(s))
chain_len: usize,
}
impl ChunkLease {
/// Creates a new `ChunkLease` from a static byte-slice with already written bytes and a `lock`.
pub fn new(content: &'static mut [u8], lock: Arc<()>) -> ChunkLease {
let capacity = content.len();
let write_pointer = content.len();
ChunkLease {
content,
write_pointer,
chain_head_len: capacity,
lock,
read_pointer: 0,
chain: None,
chain_len: capacity,
}
}
/// Creates an empty ChunkLease
pub fn empty() -> ChunkLease {
unsafe {
ChunkLease::new(
std::slice::from_raw_parts_mut(NonNull::dangling().as_ptr(), 0),
Arc::new(()),
)
}
}
/// The full length of the underlying bytes without read/write-pointers
/// Use `remaining()`/`remaining_mut()` for readable/writable lengths
pub fn capacity(&self) -> usize {
self.chain_len
}
/// This inserts a FrameHead at the head of the Chunklease, the ChunkLease should be padded manually
/// before this method is invoked, i.e. it does not create space for the head on its own.
///
/// Proper framing thus requires 1. pad(), 2 serialise into DecodeBuffer, 3. get_chunk_lease, 4. insert_head
pub(crate) fn insert_head(&mut self, mut head: FrameHead) {
// Store the write pointer
let written = self.write_pointer;
// Move write-pointer to the front of the buffer:
self.write_pointer = 0;
// Encode the into self
head.encode_into(self);
// Restore write-pointer
self.write_pointer = written;
}
/// Appends `new_tail` to the end of the `ChunkLease` chain
pub(crate) fn append_to_chain(&mut self, new_tail: ChunkLease) {
self.chain_len += new_tail.chain_len;
if let Some(tail) = &mut self.chain {
// recursion
tail.append_to_chain(new_tail);
} else {
// recursion complete
self.chain = Some(Box::new(new_tail))
}
}
/// Splits self into two `ChunkLeases` such that `self` has the length of `position`
/// This operation does not account for read and write pointers.
pub(crate) fn split_at(&mut self, position: usize) -> ChunkLease {
// Recursion by decrementing position in each recursive step
assert!(
position <= self.capacity(),
"Trying to split at bad position: {}, read_pointer: {}, \
chain_head_len: {}, chain_len: {}",
position,
self.read_pointer,
self.chain_head_len,
self.chain_len,
);
self.chain_len = position;
match position.cmp(&self.chain_head_len) {
Ordering::Greater => {
// Do recursion
if let Some(tail) = &mut self.chain {
return tail.split_at(position - self.chain_head_len);
}
}
Ordering::Equal => {
// Simple split, take the chain out and return it
return if let Some(tail_chain) = self.chain.take() {
*tail_chain
} else {
// Special case, splitting at the end of the ChunkLease
ChunkLease::empty()
};
}
Ordering::Less => {
// Split of the data in self and retain self while returning the "tail" of the split
unsafe {
let content_ptr = (*self.content).as_mut_ptr();
let head_bytes = std::slice::from_raw_parts_mut(content_ptr, position);
let tail_ptr = content_ptr.add(position);
let tail_bytes =
std::slice::from_raw_parts_mut(tail_ptr, self.chain_head_len - position);
self.content = head_bytes;
self.chain_head_len = position;
let mut return_lease = ChunkLease::new(tail_bytes, self.lock.clone());
if let Some(tail_chain) = self.chain.take() {
return_lease.append_to_chain(*tail_chain);
}
return return_lease;
}
}
}
// Should not be reachable
panic!(
"Trying to split faulty ChunkLease {:#?} at position: {}, should never happen",
&self, &position
);
}
// Recursive method for the bytes() impl
fn get_chunk_at(&self, pos: usize) -> &[u8] {
if pos >= self.chain_head_len {
if let Some(chain) = &self.chain {
chain.get_chunk_at(pos - self.chain_head_len)
} else {
panic!("Critical Bug in ChunkLease, bad chain");
}
} else {
let slice: &[u8] = &*self.content;
&slice[pos..]
}
}
// Recursive method for the bytes_mut() impl
fn get_chunk_mut_at(&mut self, pos: usize) -> &mut UninitSlice {
if pos >= self.chain_head_len {
if let Some(chain) = &mut self.chain {
chain.get_chunk_mut_at(pos - self.chain_head_len)
} else {
panic!("Critical Bug in ChunkLease, bad chain");
}
} else {
unsafe {
let offset_ptr = self.content.as_mut_ptr().add(pos);
UninitSlice::from_raw_parts_mut(offset_ptr, self.chain_head_len - pos)
}
}
}
/// Transforms this `ChunkLease` into a [ChunkRef](ChunkRef), an immutable and cloneable smart-pointer
///
/// Data stored before the current `read_pointer` location will be dropped and the new `ChunkRef`
/// will have a `read_pointer` at 0.
pub fn into_chunk_ref(mut self) -> ChunkRef {
if self.read_pointer > 0 {
let mut tail = self.split_at(self.read_pointer);
tail.read_pointer = 0;
tail.into_chunk_ref()
} else {
let chain = self.chain.map(|chain| Box::new(chain.into_chunk_ref()));
ChunkRef::new(
self.content,
self.read_pointer,
self.chain_head_len,
self.lock,
chain,
self.chain_len,
)
}
}
/// Creates a chained [ChunkRef](ChunkRef) with `tail` at the end of the new `ChunkRef`.
pub fn into_chunk_ref_with_tail(self, tail: ChunkRef) -> ChunkRef {
let mut chunk_ref = self.into_chunk_ref();
chunk_ref.append_to_chain(tail);
chunk_ref
}
/// Creates a chained [ChunkRef](ChunkRef) with `head` at the front of the new `ChunkRef`.
pub fn into_chunk_ref_with_head(self, mut head: ChunkRef) -> ChunkRef {
let chunk_ref = self.into_chunk_ref();
head.append_to_chain(chunk_ref);
head
}
/// Creates a byte-clone of the contents of the *remaining* bytes within the ChunkLease.
/// This is a costly operation and should be avoided.
pub fn create_byte_clone(&self) -> Bytes {
let mut buf: Vec<u8> = Vec::with_capacity(self.remaining());
let mut read_pointer = self.read_pointer;
while read_pointer < self.chain_len {
let read_bytes = self.get_chunk_at(read_pointer);
buf.extend_from_slice(read_bytes);
read_pointer += read_bytes.len();
}
Bytes::from(buf)
}
}
impl Buf for ChunkLease {
fn remaining(&self) -> usize {
self.chain_len - self.read_pointer
}
fn chunk(&self) -> &[u8] {
self.get_chunk_at(self.read_pointer)
}
fn advance(&mut self, cnt: usize) {
self.read_pointer += cnt;
}
}
// BufMut currently only used for injecting a FrameHead at the front.
unsafe impl BufMut for ChunkLease {
fn remaining_mut(&self) -> usize {
self.chain_len - self.write_pointer
}
unsafe fn advance_mut(&mut self, cnt: usize) {
self.write_pointer += cnt;
}
fn chunk_mut(&mut self) -> &mut UninitSlice {
self.get_chunk_mut_at(self.write_pointer)
}
}
unsafe impl Send for ChunkLease {}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
// Use different data sizes and buffer sizes to test different kinds of chains/splits.
// Creates two test-strings both of length data_len
fn chain_and_split_test(
chunk_len: usize,
data_len: usize,
) -> (EncodeBuffer, ChunkLease, ChunkLease) {
// Create an EncodeBuffer with a small chunk_size
// Inserts two test-strings into the EncodeBuffer, both of which spans multiple chunk-leases
// the two strings are extracted as a single continuous chunk-lease.
// The ChunkLease is then split and the two halves are compared to the original test-strings
let mut cfg = BufferConfig::default();
cfg.chunk_size(chunk_len);
cfg.initial_chunk_count(2);
// let pool = BufferPool::with_config(&cfg, &None);
let mut encode_buffer = EncodeBuffer::with_config(&cfg, &None);
// Create some data
let mut test_string = "".to_string();
for i in 0..data_len {
test_string.push((i.to_string()).chars().next().unwrap());
}
let mut test_string2 = "".to_string();
for i in data_len..data_len * 2 {
test_string2.push((i.to_string()).chars().next().unwrap());
}
// Create a ChunkLease with two identical messages from the data
let mut both_strings = {
let mut buffer_encoder = encode_buffer
.get_buffer_encoder()
.expect("Should not run out of buffers in test case");
buffer_encoder.put_slice(test_string.as_bytes());
buffer_encoder.put_slice(test_string2.as_bytes());
buffer_encoder.get_chunk_lease().unwrap()
};
// Assert the lengths before split
assert_eq!(both_strings.remaining(), test_string.len() * 2);
// Split the double down the middle
let mut second_half = both_strings.split_at(both_strings.remaining() / 2);
let mut first_half = both_strings; // easier to read assertions
// Assert lengths after split
assert_eq!(second_half.remaining(), first_half.remaining());
assert_eq!(test_string.len(), second_half.remaining());
let test_bytes = Bytes::copy_from_slice(test_string.as_bytes());
let test_bytes2 = Bytes::copy_from_slice(test_string2.as_bytes());
// Assert that create_byte_clone() works
assert_eq!(test_bytes, first_half.create_byte_clone());
assert_eq!(test_bytes2, second_half.create_byte_clone());
// Assert the content is correct
assert_eq!(test_bytes, first_half.copy_to_bytes(first_half.remaining()));
assert_eq!(
test_bytes2,
second_half.copy_to_bytes(second_half.remaining())
);
encode_buffer
.swap_buffer() // ensure that the last chunk is swapped
.expect("Should not run out of buffers in test case");
(encode_buffer, first_half, second_half)
}
// Three test cases for the chain and split:
// 1st with chains before and after split
// 2nd without no chains
// 3rd with chaining before split but not after (split at chain boundary)
// Assertions on byte-contents, lengths done in test function
#[test]
fn chunk_lease_chain_and_split_with_chains() {
let (mut encode_buffer, first, second) = chain_and_split_test(128, 300);
// We've asserted the bytes are correct now we check the tests unique properties
assert!(first.chain.is_some());
assert!(second.chain.is_some());
assert_eq!(
encode_buffer.buffer_pool.count_locked_chunks(),
600 / 128 + 1
);
}
#[test]
fn chunk_lease_chain_and_split_without_chains() {
let (mut encode_buffer, first, second) = chain_and_split_test(128, 64);
assert!(first.chain.is_none());
assert!(second.chain.is_none());
assert_eq!(encode_buffer.buffer_pool.count_locked_chunks(), 1);
}
#[test]
fn chunk_lease_chain_and_split_with_distinct_chunks() {
let (mut encode_buffer, first, second) = chain_and_split_test(128, 128);
assert!(first.chain.is_none());
assert!(second.chain.is_none());
assert_eq!(encode_buffer.buffer_pool.count_locked_chunks(), 2);
}
}