typeline_core/record_data/
bytes_insertion_stream.rs1use std::io::Write;
2
3use bstr::ByteSlice;
4
5use crate::utils::{
6 maybe_text::MaybeTextRef,
7 text_write::{MaybeTextWrite, TextWrite},
8};
9
10use super::{
11 field_data::{
12 field_value_flags, FieldData, FieldValueFormat, FieldValueRepr,
13 INLINE_STR_MAX_LEN,
14 },
15 push_interface::PushInterface,
16};
17
18struct RawBytesInserter<'a> {
19 fd: &'a mut FieldData,
20 run_len: usize,
21 ptr: *mut u8,
22 bytes_inserted: usize,
23 cap: usize,
24 target: Vec<u8>,
26}
27
28impl<'a> RawBytesInserter<'a> {
29 pub fn new(fd: &'a mut FieldData, run_len: usize) -> Self {
30 Self {
31 ptr: fd.data.tail_ptr_mut(),
32 bytes_inserted: 0,
33 cap: fd.data.contiguous_tail_space_available(),
34 target: Vec::new(),
35 fd,
36 run_len,
37 }
38 }
39 fn write_bytes(&mut self, buf: &[u8]) {
40 let buf_len = buf.len();
41 let len_new = self.bytes_inserted + buf_len;
42 if len_new > INLINE_STR_MAX_LEN {
43 if self.target.is_empty() {
44 self.target.extend_from_slice(unsafe {
45 std::slice::from_raw_parts(
46 self.ptr.sub(self.bytes_inserted),
47 self.bytes_inserted,
48 )
49 });
50 }
51 self.target.extend(buf);
52 self.bytes_inserted = len_new;
53 return;
54 }
55
56 if len_new > self.cap {
57 let data_len = self.fd.data.len();
58 unsafe {
59 self.fd.data.set_len(data_len + self.bytes_inserted);
60 self.fd
61 .data
62 .reserve_contiguous(len_new, self.bytes_inserted);
63 self.fd.data.set_len(data_len);
64 }
65 self.ptr = self.fd.data.tail_ptr_mut();
66 self.cap = self.fd.data.contiguous_tail_space_available();
67 }
68 unsafe {
69 std::ptr::copy_nonoverlapping(buf.as_ptr(), self.ptr, buf_len);
70 self.ptr = self.ptr.add(buf_len);
71 }
72 self.bytes_inserted = len_new;
73 }
74 unsafe fn commit_inline(&mut self, text: bool) {
75 let fmt = FieldValueFormat {
76 repr: if text {
77 FieldValueRepr::TextInline
78 } else {
79 FieldValueRepr::BytesInline
80 },
81 flags: field_value_flags::SHARED_VALUE,
82 size: self.bytes_inserted as u16,
83 };
84 self.fd.field_count += self.run_len;
85 let header_rle = self.run_len == 1
86 && self
87 .fd
88 .headers
89 .back()
90 .map(|h| h.is_value_appendable(fmt))
91 .unwrap_or(false);
92 unsafe {
93 self.fd
94 .data
95 .set_len(self.fd.data.len() + self.bytes_inserted);
96 self.fd.add_header_for_single_value(
97 fmt,
98 self.run_len,
99 header_rle,
100 false,
101 )
102 }
103 }
104 unsafe fn commit_heap(&mut self, text: bool) {
105 let buffer = std::mem::take(&mut self.target);
106 if text {
107 self.fd.push_string(
108 unsafe { String::from_utf8_unchecked(buffer) },
109 self.run_len,
110 true,
111 false,
112 );
113 } else {
114 self.fd.push_bytes_buffer(buffer, self.run_len, true, false);
115 }
116 }
117 unsafe fn commit(&mut self, text: bool) {
118 unsafe {
119 if self.bytes_inserted > INLINE_STR_MAX_LEN {
120 self.commit_heap(text);
121 return;
122 }
123 self.commit_inline(text);
124 }
125 }
126 unsafe fn commit_maybe_text(&mut self) {
127 unsafe { self.commit(self.get_inserted_data().is_utf8()) }
128 }
129 fn free_memory(&mut self) {
130 std::mem::take(&mut self.target);
131 }
132 fn get_inserted_data(&self) -> &[u8] {
133 unsafe {
134 std::slice::from_raw_parts(
135 self.ptr.sub(self.bytes_inserted),
136 self.bytes_inserted,
137 )
138 }
139 }
140
141 fn truncate(&mut self, len: usize) {
142 if self.bytes_inserted <= len {
143 return;
144 }
145 self.bytes_inserted = len;
146 self.target.truncate(len);
147 }
148}
149
150pub struct BytesInsertionStream<'a>(RawBytesInserter<'a>);
151impl<'a> BytesInsertionStream<'a> {
152 pub fn new(fd: &'a mut FieldData, run_len: usize) -> Self {
153 Self(RawBytesInserter::new(fd, run_len))
154 }
155 pub fn commit(self) {
156 drop(self)
157 }
158 pub fn commit_maybe_text(mut self) {
159 unsafe { self.0.commit_maybe_text() }
160 self.abort();
161 }
162 pub unsafe fn commit_as_text(mut self) {
163 unsafe { self.0.commit(true) }
164 self.abort();
165 }
166 pub fn abort(mut self) {
167 self.0.free_memory();
168 std::mem::forget(self);
169 }
170 pub fn get_inserted_data(&self) -> &[u8] {
171 self.0.get_inserted_data()
172 }
173 pub fn truncate(&mut self, len: usize) {
174 self.0.truncate(len)
175 }
176}
177impl Drop for BytesInsertionStream<'_> {
178 fn drop(&mut self) {
179 unsafe { self.0.commit(false) }
180 }
181}
182
183impl Write for BytesInsertionStream<'_> {
184 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
185 self.0.write_bytes(buf);
186 Ok(buf.len())
187 }
188
189 fn flush(&mut self) -> std::io::Result<()> {
190 Ok(())
191 }
192}
193
194pub struct TextInsertionStream<'a>(RawBytesInserter<'a>);
195impl<'a> TextInsertionStream<'a> {
196 pub fn new(fd: &'a mut FieldData, run_len: usize) -> Self {
197 Self(RawBytesInserter::new(fd, run_len))
198 }
199 pub fn commit(self) {
200 drop(self)
201 }
202 pub fn abort(mut self) {
203 self.0.free_memory();
204 std::mem::forget(self);
205 }
206 pub fn get_inserted_data(&self) -> &str {
207 unsafe { std::str::from_utf8_unchecked(self.0.get_inserted_data()) }
210 }
211}
212impl TextWrite for TextInsertionStream<'_> {
213 unsafe fn write_text_unchecked(
214 &mut self,
215 buf: &[u8],
216 ) -> std::io::Result<usize> {
217 self.0.write_bytes(buf);
218 Ok(buf.len())
219 }
220
221 fn flush_text(&mut self) -> std::io::Result<()> {
222 Ok(())
223 }
224}
225impl Drop for TextInsertionStream<'_> {
226 fn drop(&mut self) {
227 unsafe { self.0.commit(true) }
228 }
229}
230
231pub struct MaybeTextInsertionStream<'a> {
232 base: RawBytesInserter<'a>,
233 is_text: bool,
234}
235impl<'a> MaybeTextInsertionStream<'a> {
236 pub fn new(fd: &'a mut FieldData, run_len: usize) -> Self {
237 Self {
238 base: RawBytesInserter::new(fd, run_len),
239 is_text: true,
240 }
241 }
242 pub fn commit(self) {
243 drop(self)
244 }
245 pub fn abort(mut self) {
246 self.base.free_memory();
247 std::mem::forget(self);
248 }
249 pub unsafe fn set_is_text(&mut self, is_text: bool) {
250 self.is_text = is_text;
251 }
252 pub fn is_text(&self) -> bool {
253 self.is_text
254 }
255 pub fn get_inserted_data(&self) -> MaybeTextRef {
256 let data = self.base.get_inserted_data();
257 if self.is_text {
258 return unsafe {
259 MaybeTextRef::Text(std::str::from_utf8_unchecked(data))
260 };
261 }
262 MaybeTextRef::Bytes(data)
263 }
264}
265impl TextWrite for MaybeTextInsertionStream<'_> {
266 unsafe fn write_text_unchecked(
267 &mut self,
268 buf: &[u8],
269 ) -> std::io::Result<usize> {
270 self.base.write_bytes(buf);
271 Ok(buf.len())
272 }
273
274 fn flush_text(&mut self) -> std::io::Result<()> {
275 Ok(())
276 }
277}
278impl std::io::Write for MaybeTextInsertionStream<'_> {
279 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
280 self.is_text = false;
281 self.base.write_bytes(buf);
282 Ok(buf.len())
283 }
284
285 fn flush(&mut self) -> std::io::Result<()> {
286 Ok(())
287 }
288}
289
290impl MaybeTextWrite for MaybeTextInsertionStream<'_> {
291 fn as_text_write(&mut self) -> &mut dyn TextWrite {
292 self
293 }
294 fn as_io_write(&mut self) -> &mut dyn std::io::Write {
295 self
296 }
297 fn deref_dyn(&mut self) -> &mut dyn MaybeTextWrite {
298 self
299 }
300}
301impl Drop for MaybeTextInsertionStream<'_> {
302 fn drop(&mut self) {
303 unsafe { self.base.commit(self.is_text) }
304 }
305}