typeline_core/record_data/
bytes_insertion_stream.rs

1use std::io::Write;
2
3use bstr::ByteSlice;
4
5use crate::utils::{
6    maybe_text::MaybeTextRef,
7    text_write::{MaybeTextWrite, TextWrite},
8};
9
10use super::{
11    field_data::{
12        field_value_flags, FieldData, FieldValueFormat, FieldValueRepr,
13        INLINE_STR_MAX_LEN,
14    },
15    push_interface::PushInterface,
16};
17
18struct RawBytesInserter<'a> {
19    fd: &'a mut FieldData,
20    run_len: usize,
21    ptr: *mut u8,
22    bytes_inserted: usize,
23    cap: usize,
24    // PERF: we could optimize this to make this type much smaller
25    target: Vec<u8>,
26}
27
28impl<'a> RawBytesInserter<'a> {
29    pub fn new(fd: &'a mut FieldData, run_len: usize) -> Self {
30        Self {
31            ptr: fd.data.tail_ptr_mut(),
32            bytes_inserted: 0,
33            cap: fd.data.contiguous_tail_space_available(),
34            target: Vec::new(),
35            fd,
36            run_len,
37        }
38    }
39    fn write_bytes(&mut self, buf: &[u8]) {
40        let buf_len = buf.len();
41        let len_new = self.bytes_inserted + buf_len;
42        if len_new > INLINE_STR_MAX_LEN {
43            if self.target.is_empty() {
44                self.target.extend_from_slice(unsafe {
45                    std::slice::from_raw_parts(
46                        self.ptr.sub(self.bytes_inserted),
47                        self.bytes_inserted,
48                    )
49                });
50            }
51            self.target.extend(buf);
52            self.bytes_inserted = len_new;
53            return;
54        }
55
56        if len_new > self.cap {
57            let data_len = self.fd.data.len();
58            unsafe {
59                self.fd.data.set_len(data_len + self.bytes_inserted);
60                self.fd
61                    .data
62                    .reserve_contiguous(len_new, self.bytes_inserted);
63                self.fd.data.set_len(data_len);
64            }
65            self.ptr = self.fd.data.tail_ptr_mut();
66            self.cap = self.fd.data.contiguous_tail_space_available();
67        }
68        unsafe {
69            std::ptr::copy_nonoverlapping(buf.as_ptr(), self.ptr, buf_len);
70            self.ptr = self.ptr.add(buf_len);
71        }
72        self.bytes_inserted = len_new;
73    }
74    unsafe fn commit_inline(&mut self, text: bool) {
75        let fmt = FieldValueFormat {
76            repr: if text {
77                FieldValueRepr::TextInline
78            } else {
79                FieldValueRepr::BytesInline
80            },
81            flags: field_value_flags::SHARED_VALUE,
82            size: self.bytes_inserted as u16,
83        };
84        self.fd.field_count += self.run_len;
85        let header_rle = self.run_len == 1
86            && self
87                .fd
88                .headers
89                .back()
90                .map(|h| h.is_value_appendable(fmt))
91                .unwrap_or(false);
92        unsafe {
93            self.fd
94                .data
95                .set_len(self.fd.data.len() + self.bytes_inserted);
96            self.fd.add_header_for_single_value(
97                fmt,
98                self.run_len,
99                header_rle,
100                false,
101            )
102        }
103    }
104    unsafe fn commit_heap(&mut self, text: bool) {
105        let buffer = std::mem::take(&mut self.target);
106        if text {
107            self.fd.push_string(
108                unsafe { String::from_utf8_unchecked(buffer) },
109                self.run_len,
110                true,
111                false,
112            );
113        } else {
114            self.fd.push_bytes_buffer(buffer, self.run_len, true, false);
115        }
116    }
117    unsafe fn commit(&mut self, text: bool) {
118        unsafe {
119            if self.bytes_inserted > INLINE_STR_MAX_LEN {
120                self.commit_heap(text);
121                return;
122            }
123            self.commit_inline(text);
124        }
125    }
126    unsafe fn commit_maybe_text(&mut self) {
127        unsafe { self.commit(self.get_inserted_data().is_utf8()) }
128    }
129    fn free_memory(&mut self) {
130        std::mem::take(&mut self.target);
131    }
132    fn get_inserted_data(&self) -> &[u8] {
133        unsafe {
134            std::slice::from_raw_parts(
135                self.ptr.sub(self.bytes_inserted),
136                self.bytes_inserted,
137            )
138        }
139    }
140
141    fn truncate(&mut self, len: usize) {
142        if self.bytes_inserted <= len {
143            return;
144        }
145        self.bytes_inserted = len;
146        self.target.truncate(len);
147    }
148}
149
150pub struct BytesInsertionStream<'a>(RawBytesInserter<'a>);
151impl<'a> BytesInsertionStream<'a> {
152    pub fn new(fd: &'a mut FieldData, run_len: usize) -> Self {
153        Self(RawBytesInserter::new(fd, run_len))
154    }
155    pub fn commit(self) {
156        drop(self)
157    }
158    pub fn commit_maybe_text(mut self) {
159        unsafe { self.0.commit_maybe_text() }
160        self.abort();
161    }
162    pub unsafe fn commit_as_text(mut self) {
163        unsafe { self.0.commit(true) }
164        self.abort();
165    }
166    pub fn abort(mut self) {
167        self.0.free_memory();
168        std::mem::forget(self);
169    }
170    pub fn get_inserted_data(&self) -> &[u8] {
171        self.0.get_inserted_data()
172    }
173    pub fn truncate(&mut self, len: usize) {
174        self.0.truncate(len)
175    }
176}
177impl Drop for BytesInsertionStream<'_> {
178    fn drop(&mut self) {
179        unsafe { self.0.commit(false) }
180    }
181}
182
183impl Write for BytesInsertionStream<'_> {
184    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
185        self.0.write_bytes(buf);
186        Ok(buf.len())
187    }
188
189    fn flush(&mut self) -> std::io::Result<()> {
190        Ok(())
191    }
192}
193
194pub struct TextInsertionStream<'a>(RawBytesInserter<'a>);
195impl<'a> TextInsertionStream<'a> {
196    pub fn new(fd: &'a mut FieldData, run_len: usize) -> Self {
197        Self(RawBytesInserter::new(fd, run_len))
198    }
199    pub fn commit(self) {
200        drop(self)
201    }
202    pub fn abort(mut self) {
203        self.0.free_memory();
204        std::mem::forget(self);
205    }
206    pub fn get_inserted_data(&self) -> &str {
207        // SAFETY: TextWrite guaranteed that only valid utf-8 is in here
208        // (because our `write_text_unchecked` impl never partially succeeds)
209        unsafe { std::str::from_utf8_unchecked(self.0.get_inserted_data()) }
210    }
211}
212impl TextWrite for TextInsertionStream<'_> {
213    unsafe fn write_text_unchecked(
214        &mut self,
215        buf: &[u8],
216    ) -> std::io::Result<usize> {
217        self.0.write_bytes(buf);
218        Ok(buf.len())
219    }
220
221    fn flush_text(&mut self) -> std::io::Result<()> {
222        Ok(())
223    }
224}
225impl Drop for TextInsertionStream<'_> {
226    fn drop(&mut self) {
227        unsafe { self.0.commit(true) }
228    }
229}
230
231pub struct MaybeTextInsertionStream<'a> {
232    base: RawBytesInserter<'a>,
233    is_text: bool,
234}
235impl<'a> MaybeTextInsertionStream<'a> {
236    pub fn new(fd: &'a mut FieldData, run_len: usize) -> Self {
237        Self {
238            base: RawBytesInserter::new(fd, run_len),
239            is_text: true,
240        }
241    }
242    pub fn commit(self) {
243        drop(self)
244    }
245    pub fn abort(mut self) {
246        self.base.free_memory();
247        std::mem::forget(self);
248    }
249    pub unsafe fn set_is_text(&mut self, is_text: bool) {
250        self.is_text = is_text;
251    }
252    pub fn is_text(&self) -> bool {
253        self.is_text
254    }
255    pub fn get_inserted_data(&self) -> MaybeTextRef {
256        let data = self.base.get_inserted_data();
257        if self.is_text {
258            return unsafe {
259                MaybeTextRef::Text(std::str::from_utf8_unchecked(data))
260            };
261        }
262        MaybeTextRef::Bytes(data)
263    }
264}
265impl TextWrite for MaybeTextInsertionStream<'_> {
266    unsafe fn write_text_unchecked(
267        &mut self,
268        buf: &[u8],
269    ) -> std::io::Result<usize> {
270        self.base.write_bytes(buf);
271        Ok(buf.len())
272    }
273
274    fn flush_text(&mut self) -> std::io::Result<()> {
275        Ok(())
276    }
277}
278impl std::io::Write for MaybeTextInsertionStream<'_> {
279    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
280        self.is_text = false;
281        self.base.write_bytes(buf);
282        Ok(buf.len())
283    }
284
285    fn flush(&mut self) -> std::io::Result<()> {
286        Ok(())
287    }
288}
289
290impl MaybeTextWrite for MaybeTextInsertionStream<'_> {
291    fn as_text_write(&mut self) -> &mut dyn TextWrite {
292        self
293    }
294    fn as_io_write(&mut self) -> &mut dyn std::io::Write {
295        self
296    }
297    fn deref_dyn(&mut self) -> &mut dyn MaybeTextWrite {
298        self
299    }
300}
301impl Drop for MaybeTextInsertionStream<'_> {
302    fn drop(&mut self) {
303        unsafe { self.base.commit(self.is_text) }
304    }
305}