ms_pdb_msf/write.rs
1use super::*;
2use std::collections::hash_map::Entry;
3use std::io::Write;
4use tracing::{trace, trace_span};
5
6impl<'a, F: ReadAt + WriteAt> StreamWriter<'a, F> {
7 /// Writes data to a stream at a given offset. This is the main driver for all `write()` calls
8 /// and their variants.
9 ///
10 /// This function has to handle a lot of complexity:
11 /// * alignment of the starting position to a page boundary
12 /// * alignment of the ending position to a page boundary
13 /// * allocating pages for new pages
14 /// * allocating pages for copy-on-write
15 /// * updating the size of a stream
16 /// * writing zeroes into regions that were implicitly created
17 ///
18 /// Returns the new write position, which is immediately after the buffer that was provided.
19 ///
20 /// This implementation is input-dependent. That is, we will drive all of our state transitions
21 /// by "walking" through the offsets in the stream, from 0 to the end of the stream or the end
22 /// of the transfer, whichever is greater.
23 ///
24 /// For some operations (read-modify-write cycles), we use a temporary page buffer.
25 ///
26 /// This function always writes all of the data in `buf`. If it cannot, it returns `Err`.
27 #[inline(never)]
28 pub(super) fn write_core(&mut self, mut buf: &[u8], offset: u64) -> std::io::Result<()> {
29 let _span = trace_span!("StreamWriter::write_core").entered();
30
31 if buf.is_empty() {
32 return Ok(());
33 }
34
35 if *self.size == NIL_STREAM_SIZE {
36 *self.size = 0;
37 }
38
39 let page_size = self.page_allocator.page_size;
40
41 // Validate the ranges of our inputs. We validate these now so that we can compute values
42 // that depend on them without worrying about overflow.
43 let Ok(buf_len) = u32::try_from(buf.len()) else {
44 return Err(std::io::ErrorKind::InvalidInput.into());
45 };
46 let Ok(mut pos) = u32::try_from(offset) else {
47 return Err(std::io::ErrorKind::InvalidInput.into());
48 };
49 let Some(_buf_end) = pos.checked_add(buf_len) else {
50 return Err(std::io::ErrorKind::InvalidInput.into());
51 };
52
53 // Is there any implicit zero extension happening? If so, handle the zero extension now.
54 // Note that this may transfer a small prefix of buf, if the end of the zero-extension
55 // region is unaligned (i.e. pos is unaligned). If that consumes all of the data in buf,
56 // then we finish early.
57 if *self.size < pos {
58 self.write_zero_extend(&mut buf, &mut pos)?;
59 if buf.is_empty() {
60 return Ok(());
61 }
62 assert_eq!(pos, *self.size);
63 }
64
65 assert!(!buf.is_empty());
66 assert!(pos <= *self.size);
67
68 // Are we doing any overwrite?
69 if pos < *self.size {
70 self.write_overwrite(&mut buf, &mut pos)?;
71 if buf.is_empty() {
72 return Ok(());
73 }
74 assert_eq!(pos, *self.size);
75 }
76
77 assert!(!buf.is_empty());
78 assert_eq!(pos, *self.size);
79
80 // Does the write position start at an unaligned page boundary?
81 if !page_size.is_aligned(pos) {
82 self.write_unaligned_start_page(&mut buf, &mut pos)?;
83 if buf.is_empty() {
84 return Ok(());
85 }
86 }
87
88 assert!(!buf.is_empty());
89 assert_eq!(pos, *self.size);
90 assert!(page_size.is_aligned(pos));
91
92 // From this point on, we no longer need to cow pages.
93 // All pages that we write will be newly-allocated pages.
94
95 self.write_append_complete_pages(&mut buf, &mut pos)?;
96 if buf.is_empty() {
97 return Ok(());
98 }
99
100 self.write_append_final_unaligned_page(&mut buf, &mut pos)?;
101 assert!(buf.is_empty());
102
103 Ok(())
104 }
105
106 /// Append complete pages to the file.
107 fn write_append_complete_pages(
108 &mut self,
109 buf: &mut &[u8],
110 pos: &mut u32,
111 ) -> std::io::Result<()> {
112 let page_size = self.page_allocator.page_size;
113
114 assert_eq!(*self.size, *pos);
115 assert!(page_size.is_aligned(*pos));
116
117 // Append complete pages. We do this iteratively so that we can minimize our calls to
118 // alloc_pages(). Each iteration of this loop allocates a contiguous run of pages and
119 // uses a single write() call to the lower level for transferring data.
120 //
121 // This is expected to be the main "hot path" for generating new PDBs (not just editing
122 // existing ones). The page allocator should usually give us a long run of pages; it only
123 // needs to break up runs when we cross interval boundaries (because of the FPM pages).
124
125 loop {
126 let num_pages_wanted = (buf.len() as u32) / page_size;
127 if num_pages_wanted == 0 {
128 break;
129 }
130
131 // Allocate pages, add them to our stream, and update the stream size.
132 // These must all be done before I/O so that our in-memory state is consistent.
133 let (first_page, run_len) = self.page_allocator.alloc_pages(num_pages_wanted);
134 assert!(run_len > 0);
135 let xfer_len: u32 = run_len << page_size.exponent();
136 for i in 0..run_len {
137 self.pages.push(first_page + i);
138 }
139
140 let buf_head = take_n(buf, xfer_len as usize);
141
142 let file_offset = page_to_offset(first_page, page_size);
143
144 trace!(
145 stream_pos = *pos,
146 first_page = first_page,
147 file_offset,
148 xfer_len,
149 "write_append_complete_pages"
150 );
151
152 self.file.write_all_at(buf_head, file_offset)?;
153
154 *self.size += xfer_len;
155 *pos += xfer_len;
156 }
157
158 Ok(())
159 }
160
161 /// Append the final unaligned page to the file, if any.
162 fn write_append_final_unaligned_page(
163 &mut self,
164 buf: &mut &[u8],
165 pos: &mut u32,
166 ) -> std::io::Result<()> {
167 let page_size = self.page_allocator.page_size;
168
169 assert_eq!(*self.size, *pos);
170 assert!(page_size.is_aligned(*pos));
171
172 // The only thing left is a single partial page at the end. We use the page buffer so that we
173 // are sending down complete page writes to the lower-level storage device.
174 assert!(buf.len() < usize::from(page_size));
175 if buf.is_empty() {
176 return Ok(());
177 }
178
179 let page = self.page_allocator.alloc_page();
180
181 let mut page_buffer = self.page_allocator.alloc_page_buffer();
182 page_buffer[..buf.len()].copy_from_slice(buf);
183 page_buffer[buf.len()..].fill(0);
184
185 self.pages.push(page);
186 *self.size += buf.len() as u32;
187
188 let file_offset = page_to_offset(page, page_size);
189
190 trace!(
191 stream_pos = *pos,
192 page = page,
193 file_offset,
194 unaligned_len = buf.len(),
195 "write_append_final_unaligned_page"
196 );
197
198 self.file.write_all_at(&page_buffer, file_offset)?;
199
200 *pos += buf.len() as u32;
201 *buf = &[];
202
203 Ok(())
204 }
205
206 /// Handles zero-extending a stream. This occurs when the write position is beyond the
207 /// current size of the stream. This implicitly writes zeroes from the old end of the stream
208 /// to the start of the write request.
209 ///
210 /// This implementation will transfer the prefix of `buf` if `pos` is unaligned. If `buf` fits
211 /// entirely on one page, then this finishes the entire transfer. If some portion of `buf` is
212 /// transferred, then it will be written to 1 or 2 pages. It will be written to 2 pages if it
213 /// crosses a page boundary.
214 fn write_zero_extend(&mut self, buf: &mut &[u8], pos: &mut u32) -> std::io::Result<()> {
215 let page_size = self.page_allocator.page_size;
216
217 self.write_zero_extend_unaligned_start(buf, pos)?;
218 if buf.is_empty() {
219 return Ok(());
220 }
221
222 // If we have more bytes to write, then write_zero_extend_unaligned_start() should
223 // have aligned the stream size to a page boundary.
224 assert!(page_size.is_aligned(*self.size));
225
226 if *self.size < *pos {
227 self.write_zero_extend_whole_pages(*pos)?;
228 }
229
230 if *self.size < *pos {
231 self.write_zero_extend_unaligned_end(buf, pos)?;
232 }
233
234 Ok(())
235 }
236
237 fn write_zero_extend_unaligned_start(
238 &mut self,
239 buf: &mut &[u8],
240 pos: &mut u32,
241 ) -> std::io::Result<()> {
242 assert!(*self.size < *pos); // caller should have already checked this
243 let num_zx = *pos - *self.size; // number of zero-extend bytes we need
244
245 let page_size = self.page_allocator.page_size;
246
247 // current end of the stream
248 let end_spage: StreamPage = *self.size >> page_size.exponent();
249 let end_phase = offset_within_page(*self.size, page_size);
250
251 // where the new data begins
252 let pos_spage: StreamPage = *pos >> page_size.exponent();
253 let pos_phase = offset_within_page(*pos, page_size);
254
255 if end_phase != 0 {
256 // The end of the stream is not page-aligned and we are extending the end of the stream
257 // by one or more bytes.
258
259 // Prepare the page buffer that we are assembling. Zero-fill it.
260 let mut page_buffer = self.page_allocator.alloc_page_buffer();
261 page_buffer.fill(0);
262
263 // Read the old data from the page. The read starts at phase 0 within the page and ends
264 // at end_phase. If this fails, that's OK, we haven't made any state changes yet and the
265 // error propagates.
266 {
267 let file_page = self.pages[end_spage as usize];
268 let file_offset = page_to_offset(file_page, page_size);
269 self.file
270 .read_exact_at(&mut page_buffer[..end_phase as usize], file_offset)?;
271 }
272
273 if end_spage == pos_spage {
274 // The stream ends on the same page that new-data begins on, and the end of
275 // the stream is unaligned. This means that we have a very complicated page to
276 // deal with. It has old stream data, zero-extend bytes, and 1 or more bytes of
277 // new data. We also have to deal with copy-on-write for the page.
278 //
279 // We expect zero-extending to be a rare case. We implement this by allocating a
280 // page buffer, reading the unaligned piece of the old page, zeroing the middle,
281 // copying the unaligned piece of the new data, and optionally zeroing the tail.
282 // Then we allocate a fresh page (if needed) and write the page data that we built.
283 // Then we write the page to disk and update the stream page pointer.
284 //
285 // There are two subcases to consider:
286 // 1) the new data does not reach the end of this page (is unaligned), so there are
287 // some undefined bytes at the end of the page
288 // 2) the new data reaches or crosses the end of this page (is aligned), so we
289 // "paint" the entire page to its end with new data.
290
291 // within end_spage:
292 // |------old-data-------|-------zeroes------|-------new-data-----|----
293 // | |
294 // end_phase |
295 // |
296 // pos_phase
297
298 assert!(end_phase <= pos_phase);
299
300 // Copy the new data into the page buffer. The new data may end within this page
301 // or may cross the boundary to the next page, which is what the min() call handles.
302 let buf_head_len = (usize::from(page_size) - pos_phase as usize).min(buf.len());
303 let buf_head = take_n(buf, buf_head_len);
304 page_buffer[pos_phase as usize..][..buf_head.len()].copy_from_slice(buf_head);
305
306 // Move pos because we have consumed data from 'buf'
307 *pos += buf_head_len as u32;
308
309 // In this case, all of the zero-extend bytes have been handled in this first page,
310 // so we can advance pos by num_zx.
311 *self.size += num_zx;
312 *self.size += buf_head_len as u32;
313 } else {
314 // The new data does not overlap the page we are working on. That means the
315 // zero-extend region reaches to the end of the page.
316
317 // within end_spage:
318 // |------old-data-------|-------zeroes-------------------------------|
319 // | |
320 // end_phase |
321 // |
322 // page_size
323
324 let num_zx_this_page = u32::from(page_size) - end_phase;
325 *self.size += num_zx_this_page;
326 }
327
328 // COW the page and write it.
329 self.cow_page_and_write(end_spage, &page_buffer)?;
330 }
331
332 Ok(())
333 }
334
335 /// Writes zero or more complete zero pages during zero-extension. The size of the stream has
336 /// already been aligned to a page boundary.
337 ///
338 /// This does not read data from the current transfer request, so it does not need `buf`.
339 /// It also does not change `pos`.
340 fn write_zero_extend_whole_pages(&mut self, pos: u32) -> std::io::Result<()> {
341 let page_size = self.page_allocator.page_size;
342
343 assert!(*self.size <= pos);
344 assert!(page_size.is_aligned(*self.size));
345
346 if (pos - *self.size) / page_size == 0 {
347 return Ok(());
348 }
349
350 let mut page_buffer = self.page_allocator.alloc_page_buffer();
351 page_buffer.fill(0); // probably redundant
352
353 loop {
354 let num_pages_wanted = (pos - *self.size) / page_size;
355 if num_pages_wanted == 0 {
356 break;
357 }
358
359 let (first_page, run_len) = self.page_allocator.alloc_pages(num_pages_wanted);
360 assert!(run_len > 0);
361 for i in 0..run_len {
362 self.pages.push(first_page + i);
363 }
364
365 let run_size_bytes = run_len << page_size.exponent();
366 *self.size += run_size_bytes;
367
368 assert!(*self.size <= pos);
369
370 // Write the zeroed pages.
371 for i in 0..run_len {
372 let page = first_page + i;
373 self.file
374 .write_at(&page_buffer, page_to_offset(page, page_size))?;
375 }
376 }
377
378 Ok(())
379 }
380
381 /// If the zero-extend region ends with an unaligned final page, then this will write that page.
382 /// This may transfer data from `buf`.
383 fn write_zero_extend_unaligned_end(
384 &mut self,
385 buf: &mut &[u8],
386 pos: &mut u32,
387 ) -> std::io::Result<()> {
388 let page_size = self.page_allocator.page_size;
389
390 assert!(*self.size <= *pos);
391 assert!(page_size.is_aligned(*self.size));
392
393 // We should have at most a partial page.
394 let num_zx_bytes = *pos - *self.size;
395 assert!(num_zx_bytes < u32::from(page_size));
396
397 if num_zx_bytes == 0 {
398 return Ok(());
399 }
400
401 let mut page_buffer = self.page_allocator.alloc_page_buffer();
402
403 page_buffer[0..num_zx_bytes as usize].fill(0);
404
405 let num_data_len = buf
406 .len()
407 .min(usize::from(page_size) - num_zx_bytes as usize);
408 let buf_head = take_n(buf, num_data_len);
409 *pos += num_data_len as u32;
410
411 page_buffer[num_zx_bytes as usize..num_zx_bytes as usize + num_data_len]
412 .copy_from_slice(buf_head);
413
414 let end_of_data = num_zx_bytes as usize + num_data_len;
415 page_buffer[end_of_data..usize::from(page_size)].fill(0);
416
417 let page = self.page_allocator.alloc_page();
418 self.pages.push(page);
419 *self.size += end_of_data as u32;
420
421 self.file
422 .write_at(&page_buffer, page_to_offset(page, page_size))?;
423
424 Ok(())
425 }
426
427 /// Handles _most_ of overwrite.
428 ///
429 /// This handles:
430 /// * the unaligned page at the start of overwrite (if any)
431 /// * the complete, aligned pages in the middle of overwrite (if any)
432 ///
433 /// When this returns, even if buf still has data, we do not guarantee that pos == stream_size.
434 fn write_overwrite(&mut self, buf: &mut &[u8], pos: &mut u32) -> std::io::Result<()> {
435 assert!(*pos < *self.size);
436
437 self.write_overwrite_unaligned_start(buf, pos)?;
438 if buf.is_empty() {
439 return Ok(());
440 }
441
442 assert!(self.page_allocator.page_size.is_aligned(*pos));
443
444 self.write_overwrite_aligned_pages(buf, pos)?;
445 if buf.is_empty() {
446 return Ok(());
447 }
448
449 assert!(self.page_allocator.page_size.is_aligned(*pos));
450 assert!(self.page_allocator.page_size.is_aligned(*self.size));
451 Ok(())
452 }
453
454 /// Handles writing the first page during overwrite, if the first page is unaligned.
455 ///
456 /// # Requires
457 /// * `pos <= stream_size`
458 ///
459 /// # Ensures
460 /// * `buf.is_empty() || pos == stream_size`
461 fn write_overwrite_unaligned_start(
462 &mut self,
463 buf: &mut &[u8],
464 pos: &mut u32,
465 ) -> std::io::Result<()> {
466 assert!(*pos <= *self.size);
467
468 let page_size = self.page_allocator.page_size;
469
470 let pos_spage: StreamPage = *pos >> page_size.exponent();
471 let pos_phase = offset_within_page(*pos, page_size);
472 if pos_phase == 0 {
473 // The overwrite starts on an aligned page boundary. This function has no work to do.
474 return Ok(());
475 }
476
477 // In this case, we need to assemble a page from some old data and some new data.
478 // And because we need to cow the page, it is easier to reassemble everything into a
479 // page buffer.
480 //
481 // Contents of this page:
482 //
483 //
484 //
485 // not empty not empty can be empty can be empty
486 // |----old-data----------|------new-data--------|-------old-data------|-----garbage-----
487 // 0 | | |
488 // | | |
489 // pos_phase
490 //
491 // At this point, we don't know which subcase we're in. It depends on whether new_data
492 // reaches the end of this page and whether writing the new-data extends the stream size.
493
494 // Read the old page data into our page buffer. This makes cow'ing the page easier.
495 // We read the entire page because that simplifies the case where the new-data ends
496 // before stream_size.
497 let mut page_buffer = self.page_allocator.alloc_page_buffer();
498 self.read_page(pos_spage, &mut page_buffer)?;
499
500 // Copy the data from 'buf' (new-data) into the page and advance 'buf' and 'pos'.
501 let new_data_len = (usize::from(page_size) - pos_phase as usize).min(buf.len());
502 assert!(new_data_len > 0);
503 let buf_head = take_n(buf, new_data_len);
504 page_buffer[pos_phase as usize..][..buf_head.len()].copy_from_slice(buf_head);
505 *pos += new_data_len as u32;
506
507 // Cow the page and write its new contents.
508 self.cow_page_and_write(pos_spage, &page_buffer)?;
509
510 // We may have written enough data that we extended stream_size. This only happens if we
511 // drag in some of the prefix of buf.
512 if *pos > *self.size {
513 *self.size = *pos;
514 }
515
516 Ok(())
517 }
518
519 /// If we are doing overwrite and the remaining buffer contains one or more whole pages,
520 /// then this function transfers those.
521 ///
522 /// This function may extend the stream size. If it does, then it will not extend it enough
523 /// to cross a page boundary. This form of extension happens when we are overwriting beyond
524 /// the existing end of the stream.
525 ///
526 /// This function will cow pages, but will not allocate new page slots in the stream.
527 ///
528 /// # Requires
529 /// * `pos` is page-aligned
530 /// * `pos <= stream_size`
531 ///
532 /// # Ensures
533 /// * `pos` is page-aligned
534 /// * `buf.is_empty() || stream_size is page-aligned`
535 fn write_overwrite_aligned_pages(
536 &mut self,
537 buf: &mut &[u8],
538 pos: &mut u32,
539 ) -> std::io::Result<()> {
540 let page_size = self.page_allocator.page_size;
541 assert!(*pos <= *self.size);
542 assert!(page_size.is_aligned(*pos));
543
544 if *pos == *self.size {
545 return Ok(());
546 }
547
548 // The stream page where this transfer will begin (if any).
549 let pos_spage = *pos / page_size;
550
551 // Number of complete pages that can be read from buf.
552 let num_buf_pages = buf.len() as u32 / page_size;
553
554 // Number of pages at our write position that are assigned to the stream.
555 // This includes the partial page at the end, if any.
556 let num_pages_total = self.size.div_round_up(page_size);
557 assert_eq!(num_pages_total, self.pages.len() as u32);
558 let num_pages_at_pos = *pos / page_size;
559 let num_pages_owned = num_pages_total - num_pages_at_pos;
560
561 // Number of complete pages we are going to transfer from buf to disk.
562 // Since we are writing whole pages, we do not need the old contents of any page.
563 // Cow the pages, just so we get fresh pages.
564 let num_xfer_pages = num_pages_owned.min(num_buf_pages);
565 if num_xfer_pages != 0 {
566 trace!(num_pages = num_xfer_pages, "writing whole pages");
567
568 let num_xfer_bytes = num_xfer_pages << page_size.exponent();
569 let buf_head = take_n(buf, num_xfer_bytes as usize);
570 *pos += num_xfer_bytes;
571
572 let pages = &mut self.pages[pos_spage as usize..][..num_xfer_pages as usize];
573 self.page_allocator.make_pages_fresh(pages);
574 write_runs(&self.file, buf_head, pages, page_size)?;
575
576 // If the last page that we overwrite was a partial page, then we will have extended
577 // the size of the stream. This will not extend stream_size beyond a page boundary.
578 if *pos > *self.size {
579 *self.size = *pos;
580 }
581
582 if buf.is_empty() {
583 return Ok(());
584 }
585 }
586
587 assert!(page_size.is_aligned(*pos));
588 assert!(*pos <= *self.size);
589
590 // We may have gotten here because buf.len() is now less than a full page size, but we still
591 // have another page assigned in the stream. Cow it now.
592 if *self.size - *pos > 0 {
593 trace!(buf_len = buf.len(), "buffer has partial page remaining.");
594 assert!(
595 buf.len() < usize::from(page_size),
596 "size = {:x}, pos = {:x}, buf.len = 0x{:x}",
597 *self.size,
598 *pos,
599 buf.len()
600 );
601
602 let spage = *pos / page_size;
603 let old_len = *self.size - *pos;
604
605 let mut page_buffer = self.page_allocator.alloc_page_buffer();
606 if old_len > buf.len() as u32 {
607 self.read_page(spage, &mut page_buffer)?;
608 }
609
610 page_buffer[..buf.len()].copy_from_slice(buf);
611
612 self.cow_page_and_write(spage, &page_buffer)?;
613 *pos += buf.len() as u32;
614 if *pos > *self.size {
615 *self.size = *pos;
616 }
617
618 *buf = &[];
619 return Ok(());
620 }
621
622 assert_eq!(*pos, *self.size);
623
624 Ok(())
625 }
626
627 /// Writes the unaligned start page at the beginning of the new-data. This page is only
628 /// present if `pos` is not aligned.
629 ///
630 /// # Requires
631 /// * `stream_size == pos`
632 fn write_unaligned_start_page(
633 &mut self,
634 buf: &mut &[u8],
635 pos: &mut u32,
636 ) -> std::io::Result<()> {
637 assert_eq!(*pos, *self.size);
638
639 // In this case, we need to assemble a page from some old data and some new data.
640 // And because we need to cow the page, it is easier to reassemble everything into a
641 // page buffer.
642 //
643 // Contents of this page:
644 //
645 //
646 //
647 // not empty not empty can be empty can be empty
648 // |----old-data----------|------new-data--------|-------old-data------|-----garbage-----
649 // 0 | | |
650 // | | |
651 // pos_phase
652 //
653 // At this point, we don't know which subcase we're in. It depends on whether new_data
654 // reaches the end of this page and whether writing the new-data extends the stream size.
655
656 let page_size = self.page_allocator.page_size;
657
658 // where the new data begins
659 let pos_spage: StreamPage = *pos >> page_size.exponent();
660 let pos_phase = offset_within_page(*pos, page_size); // <-- we know this is non-zero
661
662 // Read the old page data into our page buffer. This makes cow'ing the page easier.
663 // We read the entire page because that simplifies the case where the new-data ends
664 // before stream_size.
665 let mut page_buffer = self.page_allocator.alloc_page_buffer();
666
667 let file_offset = page_to_offset(self.pages[pos_spage as usize], page_size);
668 trace!(
669 stream_pos = *pos,
670 file_offset,
671 len = u32::from(page_size),
672 "write_unaligned_start_page: reading existing unaligned data"
673 );
674
675 self.file.read_exact_at(&mut page_buffer, file_offset)?;
676
677 // Copy the data from 'buf' (new-data) into the page and advance 'buf' and 'pos'.
678 let new_data_len = (usize::from(page_size) - pos_phase as usize).min(buf.len());
679 assert!(new_data_len > 0);
680 let buf_head = take_n(buf, new_data_len);
681 page_buffer[pos_phase as usize..][..buf_head.len()].copy_from_slice(buf_head);
682 *pos += new_data_len as u32;
683
684 // Cow the page and write its new contents.
685 self.cow_page_and_write(pos_spage, &page_buffer)?;
686
687 // We may have written enough data that we extended stream_size.
688 if *pos > *self.size {
689 *self.size = *pos;
690 }
691
692 Ok(())
693 }
694
695 /// Ensures that a stream page is writable (is "fresh"). If the page is not writable, then it
696 /// allocates a new page. This function returns the page number of the writable page.
697 fn cow_page(&mut self, spage: StreamPage) -> Page {
698 self.page_allocator
699 .make_page_fresh(&mut self.pages[spage as usize])
700 }
701
702 /// Ensures that a stream page is writable and then writes it.
703 ///
704 /// `data` should contain at most one page of data.
705 pub(super) fn cow_page_and_write(
706 &mut self,
707 spage: StreamPage,
708 data: &[u8],
709 ) -> std::io::Result<()> {
710 // At most one full page of data can be written.
711 debug_assert!(
712 data.len() <= usize::from(self.page_allocator.page_size),
713 "buffer cannot exceed size of a single page"
714 );
715
716 let page = self.cow_page(spage);
717 let file_offset = page_to_offset(page, self.page_allocator.page_size);
718
719 trace!(
720 stream_page = spage,
721 file_offset,
722 len = u32::from(self.page_allocator.page_size),
723 "cow_page_and_write"
724 );
725
726 self.file.write_all_at(data, file_offset)
727 }
728
729 /// Reads a stream page. The length of `data` must be exactly one page, or less. It cannot
730 /// cross page boundaries.
731 pub(super) fn read_page(
732 &self,
733 stream_page: StreamPage,
734 data: &mut [u8],
735 ) -> std::io::Result<()> {
736 debug_assert!(
737 data.len() <= usize::from(self.page_allocator.page_size),
738 "buffer cannot exceed size of a single page"
739 );
740
741 let page = self.pages[stream_page as usize];
742 let offset = page_to_offset(page, self.page_allocator.page_size);
743 self.file.read_exact_at(data, offset)
744 }
745
746 /// The caller **must** guarantee that this page is already writable.
747 pub(super) fn write_page(&self, stream_page: StreamPage, data: &[u8]) -> std::io::Result<()> {
748 let page = self.pages[stream_page as usize];
749 assert!(
750 self.page_allocator.fresh[page as usize],
751 "page is required to be fresh (writable)"
752 );
753
754 let file_offset = page_to_offset(page, self.page_allocator.page_size);
755
756 trace!(
757 stream_page,
758 page = page,
759 file_offset,
760 len = u32::from(self.page_allocator.page_size),
761 "write_page"
762 );
763
764 self.file.write_all_at(data, file_offset)
765 }
766}
767
768/// Finds the length of the prefix of pages in `pages` that are numbered sequentially.
769///
770/// This function assumes that there are no entries with the value 0xffff_ffff. (That would cause
771/// overflow.)
772fn find_longest_page_run(pages: &[Page]) -> usize {
773 if pages.is_empty() {
774 0
775 } else {
776 let mut prev = pages[0];
777 let mut i = 1;
778 while i < pages.len() && pages[i] == prev + 1 {
779 prev = pages[i];
780 i += 1;
781 }
782 i
783 }
784}
785
786#[test]
787fn test_find_longest_page_run() {
788 assert_eq!(find_longest_page_run(&[]), 0);
789 assert_eq!(find_longest_page_run(&[1]), 1);
790 assert_eq!(find_longest_page_run(&[1, 2, 3]), 3);
791 assert_eq!(find_longest_page_run(&[1, 3, 2]), 1);
792 assert_eq!(find_longest_page_run(&[1, 2, 3, 9, 9, 9]), 3);
793}
794
795/// Given a page map that corresponds to a buffer of data to write, write all of the data.
796/// Write it in a sequence of function calls that group together consecutive pages, so that
797/// we minimize the number of write() calls.
798fn write_runs<F: WriteAt>(
799 file: &F,
800 mut buf: &[u8],
801 pages: &[Page],
802 page_size: PageSize,
803) -> std::io::Result<()> {
804 let mut region_pages = pages;
805
806 assert_eq!(buf.len(), pages.len() << page_size.exponent());
807
808 loop {
809 let run_len = find_longest_page_run(region_pages);
810 if run_len == 0 {
811 break;
812 }
813
814 let page0: Page = region_pages[0];
815 let xfer_len: usize = run_len << page_size.exponent();
816 let buf_head = take_n(&mut buf, xfer_len);
817
818 // Write the run of pages. If this write fails, then the contents of the stream are
819 // now in an undefined state.
820 file.write_at(buf_head, page_to_offset(page0, page_size))?;
821
822 // Advance iterator state
823 region_pages = ®ion_pages[run_len..];
824 }
825
826 Ok(())
827}
828
829impl<F> Msf<F> {
830 /// Adds a new stream to the MSF file. The stream has a length of zero.
831 pub fn new_stream(&mut self) -> anyhow::Result<(u32, StreamWriter<'_, F>)> {
832 let _span = trace_span!("new_stream").entered();
833
834 self.requires_writeable()?;
835 self.check_can_add_stream()?;
836
837 let new_stream_index = self.stream_sizes.len() as u32;
838 trace!(new_stream_index);
839
840 self.stream_sizes.push(0);
841 let size = self.stream_sizes.last_mut().unwrap();
842
843 let pages = match self.modified_streams.entry(new_stream_index) {
844 Entry::Occupied(_) => {
845 panic!("Found entry in modified streams table that should not be present.")
846 }
847 Entry::Vacant(v) => v.insert(Vec::new()),
848 };
849
850 Ok((
851 new_stream_index,
852 StreamWriter {
853 stream: new_stream_index,
854 file: &self.file,
855 size,
856 page_allocator: &mut self.pages,
857 pos: 0,
858 pages,
859 },
860 ))
861 }
862
863 fn check_can_add_stream(&self) -> anyhow::Result<()> {
864 if self.stream_sizes.len() as u32 >= self.max_streams {
865 bail!(
866 "A new stream cannot be created because the maximum number of streams has been reached."
867 );
868 }
869 Ok(())
870 }
871
872 /// Adds a new stream to the MSF file, given the byte contents. This function returns the
873 /// stream index of the new stream.
874 pub fn new_stream_data(&mut self, data: &[u8]) -> anyhow::Result<u32>
875 where
876 F: ReadAt + WriteAt,
877 {
878 let (stream_index, mut writer) = self.new_stream()?;
879 writer.set_contents(data)?;
880 Ok(stream_index)
881 }
882
883 /// Adds a new nil stream to the MSF file.
884 pub fn nil_stream(&mut self) -> anyhow::Result<u32> {
885 self.requires_writeable()?;
886 self.check_can_add_stream()?;
887
888 let new_stream_index = self.stream_sizes.len() as u32;
889 self.stream_sizes.push(NIL_STREAM_SIZE);
890
891 match self.modified_streams.entry(new_stream_index) {
892 Entry::Occupied(_) => {
893 panic!("Found entry in modified streams table that should be present.")
894 }
895 Entry::Vacant(v) => {
896 v.insert(Vec::new());
897 }
898 }
899
900 Ok(new_stream_index)
901 }
902
903 /// Given the stream index for a stream, returns a `StreamWriter` that allows read/write
904 /// for the stream.
905 ///
906 /// If `stream` is out of range for the current set of streams, then the set of streams is
907 /// increased until `stream` is in range. For example, if a new MSF file is created, then
908 /// it is legal to immediately call `msf.write_stream(10)` on it. This will expand the Stream
909 /// Directory so that `num_streams()` returns 11 (because it must include the new stream index).
910 /// All streams lower than `stream` will be allocated as nil streams.
911 ///
912 /// If `stream` is currently a nil stream, then this function promotes the stream to a
913 /// non-nil stream.
914 pub fn write_stream(&mut self, stream: u32) -> anyhow::Result<StreamWriter<'_, F>> {
915 assert!(stream <= MAX_STREAM);
916 self.requires_writeable()?;
917
918 while (self.stream_sizes.len() as u32) <= stream {
919 _ = self.nil_stream()?;
920 }
921
922 let Some(size) = self.stream_sizes.get_mut(stream as usize) else {
923 bail!("Stream index is out of range");
924 };
925
926 // If the stream is currently a nil stream, then promote it to a zero-length stream.
927 if *size == NIL_STREAM_SIZE {
928 *size = 0;
929 }
930
931 let pages = match self.modified_streams.entry(stream) {
932 Entry::Occupied(occ) => occ.into_mut(),
933 Entry::Vacant(v) => {
934 // Copy the existing page list to a new page list.
935 //
936 // Copying the page list _does not_ imply that we can safely write to those pages,
937 // because they may still be owned by the previous committed state. Copy-on-write
938 // is handled elsewhere.
939 let starts = &self.committed_stream_page_starts[stream as usize..];
940 let old_pages =
941 &self.committed_stream_pages[starts[0] as usize..starts[1] as usize];
942 v.insert(old_pages.to_vec())
943 }
944 };
945
946 Ok(StreamWriter {
947 stream,
948 file: &self.file,
949 size,
950 page_allocator: &mut self.pages,
951 pos: 0,
952 pages,
953 })
954 }
955
956 pub(crate) fn requires_writeable(&self) -> anyhow::Result<()> {
957 match self.access_mode {
958 AccessMode::ReadWrite => Ok(()),
959 AccessMode::Read => bail!("This PDB was not opened for read/write access."),
960 }
961 }
962
963 /// Copies a stream from another PDB/MSF into this one.
964 pub fn copy_stream<Input: ReadAt>(
965 &mut self,
966 source: &Msf<Input>,
967 source_stream: u32,
968 ) -> anyhow::Result<u32>
969 where
970 F: ReadAt + WriteAt,
971 {
972 const BUFFER_LEN: usize = 16 << 20; // 16 MiB
973
974 let mut source_reader = source.get_stream_reader(source_stream)?;
975 let source_len = source_reader.len();
976
977 let mut buffer = vec![0; (source_len as usize).min(BUFFER_LEN)];
978 let (dest_stream_index, mut dest_writer) = self.new_stream()?;
979
980 loop {
981 let n = source_reader.read(&mut buffer)?;
982 if n == 0 {
983 break;
984 }
985
986 dest_writer.write_all(&buffer[..n])?;
987 }
988
989 Ok(dest_stream_index)
990 }
991
992 /// Copies a stream that implements `Read` into this PDB/MSF file.
993 pub fn copy_stream_read<Input: Read>(&mut self, source: &mut Input) -> anyhow::Result<u32>
994 where
995 F: ReadAt + WriteAt,
996 {
997 const BUFFER_LEN: usize = 16 << 20; // 16 MiB
998
999 let mut buffer = vec![0; BUFFER_LEN];
1000 let (dest_stream_index, mut dest_writer) = self.new_stream()?;
1001
1002 loop {
1003 let n = source.read(&mut buffer)?;
1004 if n == 0 {
1005 break;
1006 }
1007
1008 dest_writer.write_all(&buffer[..n])?;
1009 }
1010
1011 Ok(dest_stream_index)
1012 }
1013
1014 /// Copies a stream that implements `ReadAt` into this PDB/MSF file.
1015 pub fn copy_stream_read_at<Input: ReadAt>(&mut self, source: &Input) -> anyhow::Result<u32>
1016 where
1017 F: ReadAt + WriteAt,
1018 {
1019 const BUFFER_LEN: usize = 16 << 20; // 16 MiB
1020
1021 let mut buffer = vec![0; BUFFER_LEN];
1022 let (dest_stream_index, mut dest_writer) = self.new_stream()?;
1023
1024 let mut pos: u64 = 0;
1025
1026 loop {
1027 let n = source.read_at(&mut buffer, pos)?;
1028 if n == 0 {
1029 break;
1030 }
1031
1032 dest_writer.write_all(&buffer[..n])?;
1033 pos += n as u64;
1034 }
1035
1036 Ok(dest_stream_index)
1037 }
1038}
1039
1040/// Splits a slice `items` at a given index `n`. The slice is modified to point to the items
1041/// after `n`. The function returns the items up to `n`.
1042fn take_n<'a, T>(items: &mut &'a [T], n: usize) -> &'a [T] {
1043 let (lo, hi) = items.split_at(n);
1044 *items = hi;
1045 lo
1046}