Skip to main content

typhoon/bytes/
dynamic.rs

1#[cfg(test)]
2#[path = "../../tests/bytes/dynamic.rs"]
3mod tests;
4
5use std::cell::UnsafeCell;
6use std::fmt::{Debug, Formatter, Result as FmtResult};
7use std::marker::PhantomData;
8use std::slice::{from_raw_parts, from_raw_parts_mut};
9use std::sync::Arc;
10
11use rand::Fill;
12
13use crate::bytes::common::{ByteBuffer, ByteBufferMut};
14use crate::bytes::holder::BufferHolder;
15use crate::bytes::pool::PoolReturn;
16use crate::bytes::r#static::StaticByteBuffer;
17use crate::bytes::utils::copy_slice;
18
19/// A mutable byte buffer with Arc-based reference counting.
20/// Send but not Sync - can be moved between threads but not shared.
21/// Pool-attached buffers are returned to the pool on drop.
22pub struct DynamicByteBuffer {
23    holder: Arc<BufferHolder>,
24    length: usize,
25    start: usize,
26    end: usize,
27    _not_sync: PhantomData<UnsafeCell<()>>,
28}
29
30impl DynamicByteBuffer {
31    pub(crate) fn new(data: *mut u8, capacity: usize, before_cap: usize, size: usize, after_cap: usize, return_tx: PoolReturn) -> Self {
32        let buffer_end = before_cap + size;
33        DynamicByteBuffer {
34            holder: Arc::new(BufferHolder::new(data, capacity, return_tx)),
35            length: buffer_end + after_cap,
36            start: before_cap,
37            end: buffer_end,
38            _not_sync: PhantomData,
39        }
40    }
41
42    #[inline]
43    pub(super) fn data_ptr(&self) -> *mut u8 {
44        self.holder.data
45    }
46
47    /// Convert to an immutable StaticByteBuffer (deep copy, capacity trimmed).
48    #[inline]
49    pub fn copy(&self) -> Self {
50        DynamicByteBuffer {
51            holder: Arc::new(self.holder.copy()),
52            length: self.length,
53            start: self.start,
54            end: self.end,
55            _not_sync: PhantomData,
56        }
57    }
58
59    /// Convert to an immutable StaticByteBuffer (deep copy, capacity trimmed).
60    #[inline]
61    pub fn to_owned(&self) -> StaticByteBuffer {
62        StaticByteBuffer::from_slice(self.slice())
63    }
64
65    /// Returns the number of bytes available for prepending before the current view.
66    #[inline]
67    pub fn before_capacity(&self) -> usize {
68        self.start
69    }
70
71    /// Append `other` buffer contents to end. Returns expanded view.
72    #[inline]
73    pub fn append_buf(&self, other: &impl ByteBuffer) -> Self {
74        self.append(other.slice())
75    }
76
77    /// Prepend `other` buffer contents to start. Returns expanded view.
78    #[inline]
79    pub fn prepend_buf(&self, other: &impl ByteBuffer) -> Self {
80        self.prepend(other.slice())
81    }
82}
83
84impl ByteBuffer for DynamicByteBuffer {
85    #[inline]
86    fn len(&self) -> usize {
87        self.end - self.start
88    }
89
90    #[inline]
91    fn get(&self, at: usize) -> &u8 {
92        assert!(at < self.len(), "index out of bounds: {} >= {}", at, self.len());
93        unsafe { &*self.data_ptr().add(self.start + at) }
94    }
95
96    #[inline]
97    fn slice(&self) -> &[u8] {
98        unsafe { from_raw_parts(self.data_ptr().add(self.start), self.len()) }
99    }
100
101    #[inline]
102    fn slice_start(&self, start: usize) -> &[u8] {
103        assert!(start <= self.len(), "start out of bounds");
104        unsafe { from_raw_parts(self.data_ptr().add(self.start + start), self.len() - start) }
105    }
106
107    #[inline]
108    fn slice_end(&self, end: usize) -> &[u8] {
109        assert!(end <= self.len(), "end out of bounds");
110        unsafe { from_raw_parts(self.data_ptr().add(self.start), end) }
111    }
112
113    #[inline]
114    fn slice_both(&self, start: usize, end: usize) -> &[u8] {
115        assert!(start <= end && end <= self.len(), "invalid slice bounds");
116        unsafe { from_raw_parts(self.data_ptr().add(self.start + start), end - start) }
117    }
118
119    #[inline]
120    fn split(&self, divide: usize) -> (&[u8], &[u8]) {
121        assert!(divide <= self.len(), "divide point out of bounds");
122        unsafe {
123            let ptr = self.data_ptr().add(self.start);
124            (from_raw_parts(ptr, divide), from_raw_parts(ptr.add(divide), self.len() - divide))
125        }
126    }
127}
128
129impl ByteBufferMut for DynamicByteBuffer {
130    #[inline]
131    fn set(&self, at: usize, value: u8) {
132        assert!(at < self.len(), "index out of bounds: {} >= {}", at, self.len());
133        unsafe {
134            *self.data_ptr().add(self.start + at) = value;
135        }
136    }
137
138    #[inline]
139    fn slice_mut(&self) -> &mut [u8] {
140        unsafe { from_raw_parts_mut(self.data_ptr().add(self.start), self.len()) }
141    }
142
143    #[inline]
144    fn slice_start_mut(&self, start: usize) -> &mut [u8] {
145        assert!(start <= self.len(), "start out of bounds");
146        unsafe { from_raw_parts_mut(self.data_ptr().add(self.start + start), self.len() - start) }
147    }
148
149    #[inline]
150    fn slice_end_mut(&self, end: usize) -> &mut [u8] {
151        assert!(end <= self.len(), "end out of bounds");
152        unsafe { from_raw_parts_mut(self.data_ptr().add(self.start), end) }
153    }
154
155    #[inline]
156    fn slice_both_mut(&self, start: usize, end: usize) -> &mut [u8] {
157        assert!(start <= end && end <= self.len(), "invalid slice bounds");
158        unsafe { from_raw_parts_mut(self.data_ptr().add(self.start + start), end - start) }
159    }
160
161    #[inline]
162    fn split_mut(&self, divide: usize) -> (&mut [u8], &mut [u8]) {
163        assert!(divide <= self.len(), "divide point out of bounds");
164        unsafe {
165            let ptr = self.data_ptr().add(self.start);
166            (from_raw_parts_mut(ptr, divide), from_raw_parts_mut(ptr.add(divide), self.len() - divide))
167        }
168    }
169
170    fn rebuffer_start(&self, start: usize) -> Self {
171        let new_start = self.start + start;
172        assert!(new_start <= self.end, "DynamicByteBuffer has negative length ({} > {new_start})!", self.end);
173        DynamicByteBuffer {
174            holder: Arc::clone(&self.holder),
175            length: self.length,
176            start: new_start,
177            end: self.end,
178            _not_sync: PhantomData,
179        }
180    }
181
182    fn rebuffer_end(&self, end: usize) -> Self {
183        let new_end = self.start + end;
184        assert!(new_end <= self.length, "DynamicByteBuffer exceeded its forward capacity ({new_end} > {})!", self.length);
185        DynamicByteBuffer {
186            holder: Arc::clone(&self.holder),
187            length: self.length,
188            start: self.start,
189            end: new_end,
190            _not_sync: PhantomData,
191        }
192    }
193
194    fn rebuffer_both(&self, start: usize, end: usize) -> Self {
195        let new_start = self.start + start;
196        let new_end = self.start + end;
197        assert!(new_start <= new_end, "DynamicByteBuffer has negative length ({new_end} > {new_start})!");
198        assert!(new_end <= self.length, "DynamicByteBuffer exceeded its forward capacity ({new_end} > {})!", self.length);
199        DynamicByteBuffer {
200            holder: Arc::clone(&self.holder),
201            length: self.length,
202            start: new_start,
203            end: new_end,
204            _not_sync: PhantomData,
205        }
206    }
207
208    fn expand_start(&self, size: usize) -> Self {
209        assert!(size <= self.start, "DynamicByteBuffer has negative length ({size} > {})!", self.start);
210        let new_start = self.start - size;
211        DynamicByteBuffer {
212            holder: Arc::clone(&self.holder),
213            length: self.length,
214            start: new_start,
215            end: self.end,
216            _not_sync: PhantomData,
217        }
218    }
219
220    fn expand_end(&self, size: usize) -> Self {
221        let new_end = self.end + size;
222        assert!(new_end <= self.length, "DynamicByteBuffer exceeded its forward capacity ({new_end} > {})!", self.length);
223        DynamicByteBuffer {
224            holder: Arc::clone(&self.holder),
225            length: self.length,
226            start: self.start,
227            end: new_end,
228            _not_sync: PhantomData,
229        }
230    }
231
232    fn split_buf_start(&self, divide: usize) -> (Self, Self) {
233        let new_divide = self.start + divide;
234        assert!(new_divide <= self.end, "DynamicByteBuffer has negative length ({new_divide} > {})!", self.end);
235        (
236            DynamicByteBuffer {
237                holder: Arc::clone(&self.holder),
238                length: self.length,
239                start: self.start,
240                end: new_divide,
241                _not_sync: PhantomData,
242            },
243            DynamicByteBuffer {
244                holder: Arc::clone(&self.holder),
245                length: self.length,
246                start: new_divide,
247                end: self.end,
248                _not_sync: PhantomData,
249            },
250        )
251    }
252
253    #[inline]
254    fn split_buf_end(&self, divide: usize) -> (Self, Self) {
255        self.split_buf_start(self.len() - divide)
256    }
257
258    fn append(&self, other: &[u8]) -> Self {
259        let new_end = self.end + other.len();
260        assert!(new_end <= self.length, "DynamicByteBuffer backward capacity insufficient ({new_end} > {})!", self.length);
261        copy_slice(unsafe { self.data_ptr().add(self.end) }, other);
262        DynamicByteBuffer {
263            holder: Arc::clone(&self.holder),
264            length: self.length,
265            start: self.start,
266            end: new_end,
267            _not_sync: PhantomData,
268        }
269    }
270
271    fn prepend(&self, other: &[u8]) -> Self {
272        let other_length = other.len();
273        assert!(other_length <= self.start, "DynamicByteBuffer forward capacity insufficient ({other_length} > {})!", self.start);
274        let new_start = self.start - other_length;
275        copy_slice(unsafe { self.data_ptr().add(new_start) }, other);
276        DynamicByteBuffer {
277            holder: Arc::clone(&self.holder),
278            length: self.length,
279            start: new_start,
280            end: self.end,
281            _not_sync: PhantomData,
282        }
283    }
284
285    fn ensure_size(&self, size: usize) -> Self {
286        if size > self.len() {
287            self.expand_end(size - self.len())
288        } else {
289            DynamicByteBuffer {
290                holder: Arc::clone(&self.holder),
291                length: self.length,
292                start: self.start,
293                end: self.start + size,
294                _not_sync: PhantomData,
295            }
296        }
297    }
298}
299
300impl AsMut<[u8]> for DynamicByteBuffer {
301    #[inline]
302    fn as_mut(&mut self) -> &mut [u8] {
303        self.slice_mut()
304    }
305}
306
307impl AsRef<[u8]> for DynamicByteBuffer {
308    #[inline]
309    fn as_ref(&self) -> &[u8] {
310        self.slice()
311    }
312}
313
314impl From<DynamicByteBuffer> for Vec<u8> {
315    #[inline]
316    fn from(val: DynamicByteBuffer) -> Self {
317        val.slice().to_vec()
318    }
319}
320
321impl<const N: usize> From<&DynamicByteBuffer> for [u8; N] {
322    #[inline]
323    fn from(val: &DynamicByteBuffer) -> Self {
324        match <[u8; N]>::try_from(val.slice()) {
325            Ok(res) => res,
326            Err(err) => panic!("error converting DynamicByteBuffer to array [u8; {N}], actual buffer length {}: {}", val.len(), err),
327        }
328    }
329}
330
331impl PartialEq for DynamicByteBuffer {
332    #[inline]
333    fn eq(&self, other: &Self) -> bool {
334        self.slice() == other.slice()
335    }
336}
337
338impl Fill for DynamicByteBuffer {
339    fn try_fill<R: rand::Rng + ?Sized>(&mut self, rng: &mut R) -> Result<(), rand::Error> {
340        rng.try_fill_bytes(self.slice_mut())
341    }
342}
343
344impl Debug for DynamicByteBuffer {
345    #[inline]
346    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
347        f.debug_struct("DynamicByteBuffer").field("length", &self.length).field("start", &self.start).field("end", &self.end).field("view_length", &self.len()).field("data", &self.slice()).finish()
348    }
349}
350
351impl Clone for DynamicByteBuffer {
352    /// Zero-copy clone: increments the `Arc<BufferHolder>` refcount and produces a second
353    /// view into the **same** backing allocation.
354    ///
355    /// # Aliasing invariant
356    /// Both the original and the clone share the same raw memory.  The caller must ensure that
357    /// no two live mutable views (via `slice_mut`, `set`, etc.) to **overlapping** byte ranges
358    /// are used concurrently — including across threads, since `DynamicByteBuffer: Send`.
359    /// Non-overlapping views (e.g. produced by `split_buf_start`/`split_buf_end`) may be used independently.
360    #[inline]
361    fn clone(&self) -> Self {
362        DynamicByteBuffer {
363            holder: Arc::clone(&self.holder),
364            length: self.length,
365            start: self.start,
366            end: self.end,
367            _not_sync: PhantomData,
368        }
369    }
370}