1use std::{
10 cmp,
11 io::{self, Read, Write},
12 ops, ptr,
13 sync::atomic::{AtomicUsize, Ordering},
14};
15
16use crate::metrics::names;
17
18static BUFFER_COUNT: AtomicUsize = AtomicUsize::new(0);
19
20pub struct Pool {
21 pub inner: poule::Pool<BufferMetadata>,
22 pub buffer_size: usize,
23}
24
25impl Pool {
26 pub fn with_capacity(minimum: usize, maximum: usize, buffer_size: usize) -> Pool {
27 let mut inner = poule::Pool::with_extra(maximum, buffer_size);
28 inner.grow_to(minimum);
29 Pool { inner, buffer_size }
30 }
31
32 pub fn checkout(&mut self) -> Option<Checkout> {
33 if self.inner.used() == self.inner.capacity()
34 && self.inner.capacity() < self.inner.maximum_capacity()
35 {
36 self.inner.grow_to(std::cmp::min(
37 self.inner.capacity() * 2,
38 self.inner.maximum_capacity(),
39 ));
40 debug!(
41 "growing pool capacity from {} to {}",
42 self.inner.capacity(),
43 std::cmp::min(self.inner.capacity() * 2, self.inner.maximum_capacity())
44 );
45 }
46 let capacity = self.buffer_size;
47 self.inner
48 .checkout(|| {
49 trace!("initializing a buffer with capacity {}", capacity);
50 BufferMetadata::new()
51 })
52 .map(|c| {
53 let old_buffer_count = BUFFER_COUNT.fetch_add(1, Ordering::SeqCst);
54 gauge!(names::buffer::IN_USE, old_buffer_count + 1);
55 Checkout { inner: c }
56 })
57 }
58}
59
60impl ops::Deref for Pool {
61 type Target = poule::Pool<BufferMetadata>;
62
63 fn deref(&self) -> &Self::Target {
64 &self.inner
65 }
66}
67
68impl ops::DerefMut for Pool {
69 fn deref_mut(&mut self) -> &mut poule::Pool<BufferMetadata> {
70 &mut self.inner
71 }
72}
73
74#[derive(Debug, PartialEq, Eq, Clone)]
75pub struct BufferMetadata {
76 position: usize,
77 end: usize,
78}
79
80impl Default for BufferMetadata {
81 fn default() -> Self {
82 Self::new()
83 }
84}
85
86impl BufferMetadata {
87 pub fn new() -> BufferMetadata {
88 BufferMetadata {
89 position: 0,
90 end: 0,
91 }
92 }
93}
94
95pub struct Checkout {
96 pub inner: poule::Checkout<BufferMetadata>,
97}
98
99impl Drop for Checkout {
116 fn drop(&mut self) {
117 let old_buffer_count = BUFFER_COUNT.fetch_sub(1, Ordering::SeqCst);
118 gauge!(names::buffer::IN_USE, old_buffer_count - 1);
119 }
120}
121
122impl Checkout {
123 pub fn available_data(&self) -> usize {
124 self.inner.end - self.inner.position
125 }
126
127 pub fn available_space(&self) -> usize {
128 self.capacity() - self.inner.end
129 }
130
131 pub fn capacity(&self) -> usize {
132 self.inner.extra().len()
133 }
134
135 pub fn empty(&self) -> bool {
136 self.inner.position == self.inner.end
137 }
138
139 pub fn consume(&mut self, count: usize) -> usize {
140 let cnt = cmp::min(count, self.available_data());
141 self.inner.position += cnt;
142 if self.inner.position > self.capacity() / 2 {
143 self.shift();
145 }
146 cnt
147 }
148
149 pub fn fill(&mut self, count: usize) -> usize {
150 let cnt = cmp::min(count, self.available_space());
151 self.inner.end += cnt;
152 if self.available_space() < self.available_data() + cnt {
153 self.shift();
155 }
156
157 cnt
158 }
159
160 pub fn reset(&mut self) {
161 self.inner.position = 0;
162 self.inner.end = 0;
163 }
164
165 pub fn sync(&mut self, end: usize, position: usize) {
166 self.inner.position = position;
167 self.inner.end = end;
168 }
169
170 pub fn data(&self) -> &[u8] {
171 &self.inner.extra()[self.inner.position..self.inner.end]
172 }
173
174 pub fn space(&mut self) -> &mut [u8] {
175 let range = self.inner.end..self.capacity();
176 &mut self.inner.extra_mut()[range]
177 }
178
179 pub fn shift(&mut self) {
180 let pos = self.inner.position;
181 let end = self.inner.end;
182 if pos > 0 {
183 unsafe {
188 let length = end - pos;
189 ptr::copy(
190 self.inner.extra()[pos..end].as_ptr(),
191 self.inner.extra_mut()[..length].as_mut_ptr(),
192 length,
193 );
194 self.inner.position = 0;
195 self.inner.end = length;
196 }
197 }
198 }
199
200 pub fn delete_slice(&mut self, start: usize, length: usize) -> Option<usize> {
201 if start + length >= self.available_data() {
202 return None;
203 }
204
205 unsafe {
211 let begin = self.inner.position + start;
212 let next_end = self.inner.end - length;
213 ptr::copy(
214 self.inner.extra()[begin + length..self.inner.end].as_ptr(),
215 self.inner.extra_mut()[begin..next_end].as_mut_ptr(),
216 self.inner.end - (begin + length),
217 );
218 self.inner.end = next_end;
219 }
220 Some(self.available_data())
221 }
222
223 pub fn replace_slice(&mut self, data: &[u8], start: usize, length: usize) -> Option<usize> {
224 let data_len = data.len();
225 if start + length > self.available_data()
226 || self.inner.position + start + data_len > self.capacity()
227 {
228 return None;
229 }
230
231 unsafe {
238 let begin = self.inner.position + start;
239 let slice_end = begin + data_len;
240 if data_len < length {
242 ptr::copy(
243 data.as_ptr(),
244 self.inner.extra_mut()[begin..slice_end].as_mut_ptr(),
245 data_len,
246 );
247
248 ptr::copy(
249 self.inner.extra()[start + length..self.inner.end].as_ptr(),
250 self.inner.extra_mut()[slice_end..].as_mut_ptr(),
251 self.inner.end - (start + length),
252 );
253 self.inner.end -= length - data_len;
254
255 } else {
257 ptr::copy(
258 self.inner.extra()[start + length..self.inner.end].as_ptr(),
259 self.inner.extra_mut()[start + data_len..].as_mut_ptr(),
260 self.inner.end - (start + length),
261 );
262 ptr::copy(
263 data.as_ptr(),
264 self.inner.extra_mut()[begin..slice_end].as_mut_ptr(),
265 data_len,
266 );
267 self.inner.end += data_len - length;
268 }
269 }
270 Some(self.available_data())
271 }
272
273 pub fn insert_slice(&mut self, data: &[u8], start: usize) -> Option<usize> {
274 let data_len = data.len();
275 if start > self.available_data()
276 || self.inner.position + self.inner.end + data_len > self.capacity()
277 {
278 return None;
279 }
280
281 unsafe {
287 let begin = self.inner.position + start;
288 let slice_end = begin + data_len;
289 ptr::copy(
290 self.inner.extra()[start..self.inner.end].as_ptr(),
291 self.inner.extra_mut()[start + data_len..].as_mut_ptr(),
292 self.inner.end - start,
293 );
294 ptr::copy(
295 data.as_ptr(),
296 self.inner.extra_mut()[begin..slice_end].as_mut_ptr(),
297 data_len,
298 );
299 self.inner.end += data_len;
300 }
301 Some(self.available_data())
302 }
303}
304
305impl Write for Checkout {
306 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
307 match self.space().write(buf) {
308 Ok(size) => {
309 self.fill(size);
310 Ok(size)
311 }
312 err => err,
313 }
314 }
315
316 fn flush(&mut self) -> io::Result<()> {
317 Ok(())
318 }
319}
320
321impl Read for Checkout {
322 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
323 let len = cmp::min(self.available_data(), buf.len());
324 unsafe {
330 ptr::copy(
331 self.inner.extra()[self.inner.position..self.inner.position + len].as_ptr(),
332 buf.as_mut_ptr(),
333 len,
334 );
335 self.inner.position += len;
336 }
337 Ok(len)
338 }
339}
340
341#[cfg(test)]
342mod tests {
343 use super::*;
344 use std::io::{Read, Write};
345
346 fn create_test_pool(buffer_size: usize, max_count: usize) -> Pool {
348 Pool::with_capacity(max_count, max_count, buffer_size)
349 }
350
351 fn checkout_with_data(pool: &mut Pool, data: &[u8]) -> Checkout {
353 let mut buf = pool.checkout().expect("checkout should succeed");
354 let n = buf.write(data).expect("write should succeed");
355 assert_eq!(n, data.len(), "all bytes should be written");
356 buf
357 }
358
359 #[test]
364 fn test_pool_checkout_returns_buffer() {
365 let mut pool = create_test_pool(1024, 2);
366 let buf = pool.checkout();
367 assert!(
368 buf.is_some(),
369 "first checkout from a fresh pool must succeed"
370 );
371 let buf = buf.unwrap();
372 assert_eq!(buf.capacity(), 1024);
373 assert_eq!(buf.available_data(), 0);
374 assert_eq!(buf.available_space(), 1024);
375 }
376
377 #[test]
378 fn test_pool_checkin_on_drop() {
379 let mut pool = create_test_pool(128, 1);
380 {
381 let _buf = pool.checkout().expect("checkout should succeed");
382 assert_eq!(pool.inner.used(), 1);
383 }
384 assert_eq!(pool.inner.used(), 0);
385 let buf2 = pool.checkout();
386 assert!(buf2.is_some(), "checkout after checkin should succeed");
387 }
388
389 #[test]
390 fn test_pool_auto_grow() {
391 let mut pool = Pool::with_capacity(1, 4, 256);
392 let _b1 = pool.checkout().expect("first checkout");
393 let _b2 = pool.checkout().expect("second checkout triggers growth");
394 let _b3 = pool.checkout().expect("third checkout");
395 }
396
397 #[test]
402 fn test_checkout_write_and_read_data() {
403 let mut pool = create_test_pool(1024, 2);
404 let mut buf = pool.checkout().unwrap();
405
406 let payload = b"hello world";
407 let written = buf.write(payload).unwrap();
408 assert_eq!(written, payload.len());
409 assert_eq!(buf.available_data(), payload.len());
410 assert_eq!(buf.data(), payload);
411 }
412
413 #[test]
414 fn test_checkout_read_trait() {
415 let mut pool = create_test_pool(1024, 2);
416 let mut buf = checkout_with_data(&mut pool, b"hello");
417
418 let mut out = [0u8; 5];
419 let n = buf.read(&mut out).unwrap();
420 assert_eq!(n, 5);
421 assert_eq!(&out, b"hello");
422 }
423
424 #[test]
425 fn test_consume_and_fill() {
426 let mut pool = create_test_pool(1024, 2);
427 let mut buf = checkout_with_data(&mut pool, b"abcdefghij");
428
429 let consumed = buf.consume(3);
430 assert_eq!(consumed, 3);
431 assert_eq!(buf.data(), b"defghij");
432 assert_eq!(buf.available_data(), 7);
433
434 let filled = buf.fill(0);
435 assert_eq!(filled, 0);
436 }
437
438 #[test]
439 fn test_empty() {
440 let mut pool = create_test_pool(64, 2);
441 let buf = pool.checkout().unwrap();
442 assert!(buf.empty(), "freshly checked-out buffer should be empty");
443 }
444
445 #[test]
446 fn test_reset() {
447 let mut pool = create_test_pool(1024, 2);
448 let mut buf = checkout_with_data(&mut pool, b"data");
449 assert!(!buf.empty());
450
451 buf.reset();
452 assert!(buf.empty());
453 assert_eq!(buf.available_data(), 0);
454 }
455
456 #[test]
457 fn test_sync() {
458 let mut pool = create_test_pool(1024, 2);
459 let mut buf = checkout_with_data(&mut pool, b"hello world");
460 buf.sync(5, 2);
461 assert_eq!(buf.available_data(), 3);
462 }
463
464 #[test]
469 fn test_shift_moves_data_to_start() {
470 let mut pool = create_test_pool(1024, 2);
471 let mut buf = checkout_with_data(&mut pool, b"hello world");
472
473 buf.inner.position = 5;
474 assert_eq!(buf.data(), b" world");
475
476 buf.shift();
477 assert_eq!(buf.inner.position, 0);
478 assert_eq!(buf.inner.end, 6);
479 assert_eq!(buf.data(), b" world");
480 }
481
482 #[test]
483 fn test_shift_noop_when_position_zero() {
484 let mut pool = create_test_pool(1024, 2);
485 let mut buf = checkout_with_data(&mut pool, b"hello");
486
487 assert_eq!(buf.inner.position, 0);
488 buf.shift();
489 assert_eq!(buf.data(), b"hello");
490 assert_eq!(buf.inner.position, 0);
491 assert_eq!(buf.inner.end, 5);
492 }
493
494 #[test]
495 fn test_consume_triggers_auto_shift() {
496 let mut pool = create_test_pool(256, 2);
497 let mut buf = pool.checkout().unwrap();
498 let capacity = buf.capacity();
499
500 let fill_count = capacity / 2 + 2;
501 let data: Vec<u8> = (0..fill_count as u8).collect();
502 let written = buf.write(&data).unwrap();
503 assert_eq!(written, fill_count);
504
505 let consume_count = capacity / 2 + 1;
506 buf.consume(consume_count);
507
508 assert_eq!(buf.inner.position, 0);
509 let remaining = fill_count - consume_count;
510 assert_eq!(buf.available_data(), remaining);
511 }
512
513 #[test]
518 fn test_delete_slice_middle() {
519 let mut pool = create_test_pool(1024, 2);
520 let mut buf = checkout_with_data(&mut pool, b"hello world!");
521
522 let result = buf.delete_slice(3, 5);
523 assert!(result.is_some());
524 assert_eq!(buf.data(), b"helrld!");
525 }
526
527 #[test]
528 fn test_delete_slice_from_start() {
529 let mut pool = create_test_pool(1024, 2);
530 let mut buf = checkout_with_data(&mut pool, b"hello world!");
531
532 let result = buf.delete_slice(0, 3);
533 assert!(result.is_some());
534 assert_eq!(buf.data(), b"lo world!");
535 }
536
537 #[test]
538 fn test_delete_slice_near_end() {
539 let mut pool = create_test_pool(1024, 2);
540 let mut buf = checkout_with_data(&mut pool, b"hello world!");
541
542 let result = buf.delete_slice(7, 4);
543 assert!(result.is_some());
544 assert_eq!(buf.data(), b"hello w!");
545 }
546
547 #[test]
548 fn test_delete_slice_out_of_bounds_returns_none() {
549 let mut pool = create_test_pool(1024, 2);
550 let mut buf = checkout_with_data(&mut pool, b"hello");
551
552 let result = buf.delete_slice(0, 5);
553 assert!(result.is_none());
554
555 let result = buf.delete_slice(3, 5);
556 assert!(result.is_none());
557 }
558
559 #[test]
560 fn test_delete_slice_single_byte() {
561 let mut pool = create_test_pool(1024, 2);
562 let mut buf = checkout_with_data(&mut pool, b"abcd");
563
564 let result = buf.delete_slice(1, 1);
565 assert!(result.is_some());
566 assert_eq!(buf.data(), b"acd");
567 }
568
569 #[test]
574 fn test_replace_slice_same_size() {
575 let mut pool = create_test_pool(1024, 2);
576 let mut buf = checkout_with_data(&mut pool, b"hello world");
577
578 let result = buf.replace_slice(b"earth", 6, 5);
579 assert!(result.is_some());
580 assert_eq!(buf.data(), b"hello earth");
581 }
582
583 #[test]
584 fn test_replace_slice_shrink() {
585 let mut pool = create_test_pool(1024, 2);
586 let mut buf = checkout_with_data(&mut pool, b"hello world");
587
588 let result = buf.replace_slice(b"hi", 6, 5);
589 assert!(result.is_some());
590 assert_eq!(buf.data(), b"hello hi");
591 }
592
593 #[test]
594 fn test_replace_slice_grow() {
595 let mut pool = create_test_pool(1024, 2);
596 let mut buf = checkout_with_data(&mut pool, b"hello world");
597
598 let result = buf.replace_slice(b"universe", 6, 5);
599 assert!(result.is_some());
600 assert_eq!(buf.data(), b"hello universe");
601 }
602
603 #[test]
604 fn test_replace_slice_at_start() {
605 let mut pool = create_test_pool(1024, 2);
606 let mut buf = checkout_with_data(&mut pool, b"hello world");
607
608 let result = buf.replace_slice(b"hey", 0, 5);
609 assert!(result.is_some());
610 assert_eq!(buf.data(), b"hey world");
611 }
612
613 #[test]
614 fn test_replace_slice_out_of_bounds_returns_none() {
615 let mut pool = create_test_pool(1024, 2);
616 let mut buf = checkout_with_data(&mut pool, b"hello");
617
618 let result = buf.replace_slice(b"x", 4, 5);
619 assert!(result.is_none());
620 }
621
622 #[test]
623 fn test_replace_slice_exceeds_data_bounds_returns_none() {
624 let mut pool = create_test_pool(256, 2);
625 let mut buf = checkout_with_data(&mut pool, b"hello");
626
627 let result = buf.replace_slice(b"xyz", 3, 5);
628 assert!(result.is_none());
629 }
630
631 #[test]
632 fn test_replace_slice_replacement_exceeds_capacity_returns_none() {
633 let mut pool = create_test_pool(256, 2);
634 let mut buf = pool.checkout().unwrap();
635 let capacity = buf.capacity();
636
637 let data = vec![b'x'; capacity];
638 let written = buf.write(&data).unwrap();
639 assert_eq!(written, capacity);
640
641 buf.inner.position = capacity - 2;
642 assert_eq!(buf.available_data(), 2);
643
644 let result = buf.replace_slice(b"abcde", 0, 1);
646 assert!(result.is_none());
647 }
648
649 #[test]
654 fn test_insert_slice_at_start() {
655 let mut pool = create_test_pool(1024, 2);
656 let mut buf = checkout_with_data(&mut pool, b"world");
657
658 let result = buf.insert_slice(b"hello ", 0);
659 assert!(result.is_some());
660 assert_eq!(buf.data(), b"hello world");
661 }
662
663 #[test]
664 fn test_insert_slice_in_middle() {
665 let mut pool = create_test_pool(1024, 2);
666 let mut buf = checkout_with_data(&mut pool, b"helo");
667
668 let result = buf.insert_slice(b"l", 2);
669 assert!(result.is_some());
670 assert_eq!(buf.data(), b"hello");
671 }
672
673 #[test]
674 fn test_insert_slice_at_end() {
675 let mut pool = create_test_pool(1024, 2);
676 let mut buf = checkout_with_data(&mut pool, b"hello");
677
678 let result = buf.insert_slice(b" world", 5);
679 assert!(result.is_some());
680 assert_eq!(buf.data(), b"hello world");
681 }
682
683 #[test]
684 fn test_insert_slice_exceeds_capacity_returns_none() {
685 let mut pool = create_test_pool(256, 2);
686 let mut buf = pool.checkout().unwrap();
687 let capacity = buf.capacity();
688
689 let data = vec![b'x'; capacity];
690 let written = buf.write(&data).unwrap();
691 assert_eq!(written, capacity);
692 assert_eq!(buf.available_space(), 0);
693
694 let result = buf.insert_slice(b"y", 0);
695 assert!(result.is_none());
696 }
697
698 #[test]
699 fn test_insert_slice_beyond_data_returns_none() {
700 let mut pool = create_test_pool(1024, 2);
701 let mut buf = checkout_with_data(&mut pool, b"hello");
702
703 let result = buf.insert_slice(b"x", 6);
704 assert!(result.is_none());
705 }
706
707 #[test]
712 fn test_write_consume_shift_write_again() {
713 let mut pool = create_test_pool(32, 2);
714 let mut buf = checkout_with_data(&mut pool, b"first");
715
716 buf.consume(5);
717 assert_eq!(buf.available_data(), 0);
718
719 let n = buf.write(b"second").unwrap();
720 assert_eq!(n, 6);
721 assert_eq!(buf.data(), b"second");
722 }
723
724 #[test]
725 fn test_delete_then_insert() {
726 let mut pool = create_test_pool(1024, 2);
727 let mut buf = checkout_with_data(&mut pool, b"hello cruel world");
728
729 buf.delete_slice(6, 6);
730 assert_eq!(buf.data(), b"hello world");
731
732 buf.insert_slice(b"beautiful ", 6);
733 assert_eq!(buf.data(), b"hello beautiful world");
734 }
735
736 #[test]
737 fn test_multiple_replace_operations() {
738 let mut pool = create_test_pool(1024, 2);
739 let mut buf = checkout_with_data(&mut pool, b"aXbXc");
740
741 buf.replace_slice(b"12", 1, 1);
742 assert_eq!(buf.data(), b"a12bXc");
743
744 buf.replace_slice(b"34", 4, 1);
745 assert_eq!(buf.data(), b"a12b34c");
746 }
747}