ms_pdb_msf/
write.rs

1use super::*;
2use std::collections::hash_map::Entry;
3use std::io::Write;
4use tracing::{trace, trace_span};
5
6impl<'a, F: ReadAt + WriteAt> StreamWriter<'a, F> {
7    /// Writes data to a stream at a given offset. This is the main driver for all `write()` calls
8    /// and their variants.
9    ///
10    /// This function has to handle a lot of complexity:
11    /// * alignment of the starting position to a page boundary
12    /// * alignment of the ending position to a page boundary
13    /// * allocating pages for new pages
14    /// * allocating pages for copy-on-write
15    /// * updating the size of a stream
16    /// * writing zeroes into regions that were implicitly created
17    ///
18    /// Returns the new write position, which is immediately after the buffer that was provided.
19    ///
20    /// This implementation is input-dependent. That is, we will drive all of our state transitions
21    /// by "walking" through the offsets in the stream, from 0 to the end of the stream or the end
22    /// of the transfer, whichever is greater.
23    ///
24    /// For some operations (read-modify-write cycles), we use a temporary page buffer.
25    ///
26    /// This function always writes all of the data in `buf`. If it cannot, it returns `Err`.
27    #[inline(never)]
28    pub(super) fn write_core(&mut self, mut buf: &[u8], offset: u64) -> std::io::Result<()> {
29        let _span = trace_span!("StreamWriter::write_core").entered();
30
31        if buf.is_empty() {
32            return Ok(());
33        }
34
35        if *self.size == NIL_STREAM_SIZE {
36            *self.size = 0;
37        }
38
39        let page_size = self.page_allocator.page_size;
40
41        // Validate the ranges of our inputs. We validate these now so that we can compute values
42        // that depend on them without worrying about overflow.
43        let Ok(buf_len) = u32::try_from(buf.len()) else {
44            return Err(std::io::ErrorKind::InvalidInput.into());
45        };
46        let Ok(mut pos) = u32::try_from(offset) else {
47            return Err(std::io::ErrorKind::InvalidInput.into());
48        };
49        let Some(_buf_end) = pos.checked_add(buf_len) else {
50            return Err(std::io::ErrorKind::InvalidInput.into());
51        };
52
53        // Is there any implicit zero extension happening? If so, handle the zero extension now.
54        // Note that this may transfer a small prefix of buf, if the end of the zero-extension
55        // region is unaligned (i.e. pos is unaligned). If that consumes all of the data in buf,
56        // then we finish early.
57        if *self.size < pos {
58            self.write_zero_extend(&mut buf, &mut pos)?;
59            if buf.is_empty() {
60                return Ok(());
61            }
62            assert_eq!(pos, *self.size);
63        }
64
65        assert!(!buf.is_empty());
66        assert!(pos <= *self.size);
67
68        // Are we doing any overwrite?
69        if pos < *self.size {
70            self.write_overwrite(&mut buf, &mut pos)?;
71            if buf.is_empty() {
72                return Ok(());
73            }
74            assert_eq!(pos, *self.size);
75        }
76
77        assert!(!buf.is_empty());
78        assert_eq!(pos, *self.size);
79
80        // Does the write position start at an unaligned page boundary?
81        if !page_size.is_aligned(pos) {
82            self.write_unaligned_start_page(&mut buf, &mut pos)?;
83            if buf.is_empty() {
84                return Ok(());
85            }
86        }
87
88        assert!(!buf.is_empty());
89        assert_eq!(pos, *self.size);
90        assert!(page_size.is_aligned(pos));
91
92        // From this point on, we no longer need to cow pages.
93        // All pages that we write will be newly-allocated pages.
94
95        self.write_append_complete_pages(&mut buf, &mut pos)?;
96        if buf.is_empty() {
97            return Ok(());
98        }
99
100        self.write_append_final_unaligned_page(&mut buf, &mut pos)?;
101        assert!(buf.is_empty());
102
103        Ok(())
104    }
105
106    /// Append complete pages to the file.
107    fn write_append_complete_pages(
108        &mut self,
109        buf: &mut &[u8],
110        pos: &mut u32,
111    ) -> std::io::Result<()> {
112        let page_size = self.page_allocator.page_size;
113
114        assert_eq!(*self.size, *pos);
115        assert!(page_size.is_aligned(*pos));
116
117        // Append complete pages. We do this iteratively so that we can minimize our calls to
118        // alloc_pages(). Each iteration of this loop allocates a contiguous run of pages and
119        // uses a single write() call to the lower level for transferring data.
120        //
121        // This is expected to be the main "hot path" for generating new PDBs (not just editing
122        // existing ones). The page allocator should usually give us a long run of pages; it only
123        // needs to break up runs when we cross interval boundaries (because of the FPM pages).
124
125        loop {
126            let num_pages_wanted = (buf.len() as u32) / page_size;
127            if num_pages_wanted == 0 {
128                break;
129            }
130
131            // Allocate pages, add them to our stream, and update the stream size.
132            // These must all be done before I/O so that our in-memory state is consistent.
133            let (first_page, run_len) = self.page_allocator.alloc_pages(num_pages_wanted);
134            assert!(run_len > 0);
135            let xfer_len: u32 = run_len << page_size.exponent();
136            for i in 0..run_len {
137                self.pages.push(first_page + i);
138            }
139
140            let buf_head = take_n(buf, xfer_len as usize);
141
142            let file_offset = page_to_offset(first_page, page_size);
143
144            trace!(
145                stream_pos = *pos,
146                first_page = first_page,
147                file_offset,
148                xfer_len,
149                "write_append_complete_pages"
150            );
151
152            self.file.write_all_at(buf_head, file_offset)?;
153
154            *self.size += xfer_len;
155            *pos += xfer_len;
156        }
157
158        Ok(())
159    }
160
161    /// Append the final unaligned page to the file, if any.
162    fn write_append_final_unaligned_page(
163        &mut self,
164        buf: &mut &[u8],
165        pos: &mut u32,
166    ) -> std::io::Result<()> {
167        let page_size = self.page_allocator.page_size;
168
169        assert_eq!(*self.size, *pos);
170        assert!(page_size.is_aligned(*pos));
171
172        // The only thing left is a single partial page at the end. We use the page buffer so that we
173        // are sending down complete page writes to the lower-level storage device.
174        assert!(buf.len() < usize::from(page_size));
175        if buf.is_empty() {
176            return Ok(());
177        }
178
179        let page = self.page_allocator.alloc_page();
180
181        let mut page_buffer = self.page_allocator.alloc_page_buffer();
182        page_buffer[..buf.len()].copy_from_slice(buf);
183        page_buffer[buf.len()..].fill(0);
184
185        self.pages.push(page);
186        *self.size += buf.len() as u32;
187
188        let file_offset = page_to_offset(page, page_size);
189
190        trace!(
191            stream_pos = *pos,
192            page = page,
193            file_offset,
194            unaligned_len = buf.len(),
195            "write_append_final_unaligned_page"
196        );
197
198        self.file.write_all_at(&page_buffer, file_offset)?;
199
200        *pos += buf.len() as u32;
201        *buf = &[];
202
203        Ok(())
204    }
205
206    /// Handles zero-extending a stream. This occurs when the write position is beyond the
207    /// current size of the stream. This implicitly writes zeroes from the old end of the stream
208    /// to the start of the write request.
209    ///
210    /// This implementation will transfer the prefix of `buf` if `pos` is unaligned. If `buf` fits
211    /// entirely on one page, then this finishes the entire transfer. If some portion of `buf` is
212    /// transferred, then it will be written to 1 or 2 pages. It will be written to 2 pages if it
213    /// crosses a page boundary.
214    fn write_zero_extend(&mut self, buf: &mut &[u8], pos: &mut u32) -> std::io::Result<()> {
215        let page_size = self.page_allocator.page_size;
216
217        self.write_zero_extend_unaligned_start(buf, pos)?;
218        if buf.is_empty() {
219            return Ok(());
220        }
221
222        // If we have more bytes to write, then write_zero_extend_unaligned_start() should
223        // have aligned the stream size to a page boundary.
224        assert!(page_size.is_aligned(*self.size));
225
226        if *self.size < *pos {
227            self.write_zero_extend_whole_pages(*pos)?;
228        }
229
230        if *self.size < *pos {
231            self.write_zero_extend_unaligned_end(buf, pos)?;
232        }
233
234        Ok(())
235    }
236
237    fn write_zero_extend_unaligned_start(
238        &mut self,
239        buf: &mut &[u8],
240        pos: &mut u32,
241    ) -> std::io::Result<()> {
242        assert!(*self.size < *pos); // caller should have already checked this
243        let num_zx = *pos - *self.size; // number of zero-extend bytes we need
244
245        let page_size = self.page_allocator.page_size;
246
247        // current end of the stream
248        let end_spage: StreamPage = *self.size >> page_size.exponent();
249        let end_phase = offset_within_page(*self.size, page_size);
250
251        // where the new data begins
252        let pos_spage: StreamPage = *pos >> page_size.exponent();
253        let pos_phase = offset_within_page(*pos, page_size);
254
255        if end_phase != 0 {
256            // The end of the stream is not page-aligned and we are extending the end of the stream
257            // by one or more bytes.
258
259            // Prepare the page buffer that we are assembling. Zero-fill it.
260            let mut page_buffer = self.page_allocator.alloc_page_buffer();
261            page_buffer.fill(0);
262
263            // Read the old data from the page. The read starts at phase 0 within the page and ends
264            // at end_phase. If this fails, that's OK, we haven't made any state changes yet and the
265            // error propagates.
266            {
267                let file_page = self.pages[end_spage as usize];
268                let file_offset = page_to_offset(file_page, page_size);
269                self.file
270                    .read_exact_at(&mut page_buffer[..end_phase as usize], file_offset)?;
271            }
272
273            if end_spage == pos_spage {
274                // The stream ends on the same page that new-data begins on, and the end of
275                // the stream is unaligned. This means that we have a very complicated page to
276                // deal with. It has old stream data, zero-extend bytes, and 1 or more bytes of
277                // new data. We also have to deal with copy-on-write for the page.
278                //
279                // We expect zero-extending to be a rare case. We implement this by allocating a
280                // page buffer, reading the unaligned piece of the old page, zeroing the middle,
281                // copying the unaligned piece of the new data, and optionally zeroing the tail.
282                // Then we allocate a fresh page (if needed) and write the page data that we built.
283                // Then we write the page to disk and update the stream page pointer.
284                //
285                // There are two subcases to consider:
286                // 1) the new data does not reach the end of this page (is unaligned), so there are
287                //    some undefined bytes at the end of the page
288                // 2) the new data reaches or crosses the end of this page (is aligned), so we
289                //    "paint" the entire page to its end with new data.
290
291                // within end_spage:
292                // |------old-data-------|-------zeroes------|-------new-data-----|----
293                //                       |                   |
294                //               end_phase                   |
295                //                                           |
296                //                                           pos_phase
297
298                assert!(end_phase <= pos_phase);
299
300                // Copy the new data into the page buffer. The new data may end within this page
301                // or may cross the boundary to the next page, which is what the min() call handles.
302                let buf_head_len = (usize::from(page_size) - pos_phase as usize).min(buf.len());
303                let buf_head = take_n(buf, buf_head_len);
304                page_buffer[pos_phase as usize..][..buf_head.len()].copy_from_slice(buf_head);
305
306                // Move pos because we have consumed data from 'buf'
307                *pos += buf_head_len as u32;
308
309                // In this case, all of the zero-extend bytes have been handled in this first page,
310                // so we can advance pos by num_zx.
311                *self.size += num_zx;
312                *self.size += buf_head_len as u32;
313            } else {
314                // The new data does not overlap the page we are working on. That means the
315                // zero-extend region reaches to the end of the page.
316
317                // within end_spage:
318                // |------old-data-------|-------zeroes-------------------------------|
319                //                       |                                            |
320                //               end_phase                                            |
321                //                                                                    |
322                //                                                               page_size
323
324                let num_zx_this_page = u32::from(page_size) - end_phase;
325                *self.size += num_zx_this_page;
326            }
327
328            // COW the page and write it.
329            self.cow_page_and_write(end_spage, &page_buffer)?;
330        }
331
332        Ok(())
333    }
334
335    /// Writes zero or more complete zero pages during zero-extension. The size of the stream has
336    /// already been aligned to a page boundary.
337    ///
338    /// This does not read data from the current transfer request, so it does not need `buf`.
339    /// It also does not change `pos`.
340    fn write_zero_extend_whole_pages(&mut self, pos: u32) -> std::io::Result<()> {
341        let page_size = self.page_allocator.page_size;
342
343        assert!(*self.size <= pos);
344        assert!(page_size.is_aligned(*self.size));
345
346        if (pos - *self.size) / page_size == 0 {
347            return Ok(());
348        }
349
350        let mut page_buffer = self.page_allocator.alloc_page_buffer();
351        page_buffer.fill(0); // probably redundant
352
353        loop {
354            let num_pages_wanted = (pos - *self.size) / page_size;
355            if num_pages_wanted == 0 {
356                break;
357            }
358
359            let (first_page, run_len) = self.page_allocator.alloc_pages(num_pages_wanted);
360            assert!(run_len > 0);
361            for i in 0..run_len {
362                self.pages.push(first_page + i);
363            }
364
365            let run_size_bytes = run_len << page_size.exponent();
366            *self.size += run_size_bytes;
367
368            assert!(*self.size <= pos);
369
370            // Write the zeroed pages.
371            for i in 0..run_len {
372                let page = first_page + i;
373                self.file
374                    .write_at(&page_buffer, page_to_offset(page, page_size))?;
375            }
376        }
377
378        Ok(())
379    }
380
381    /// If the zero-extend region ends with an unaligned final page, then this will write that page.
382    /// This may transfer data from `buf`.
383    fn write_zero_extend_unaligned_end(
384        &mut self,
385        buf: &mut &[u8],
386        pos: &mut u32,
387    ) -> std::io::Result<()> {
388        let page_size = self.page_allocator.page_size;
389
390        assert!(*self.size <= *pos);
391        assert!(page_size.is_aligned(*self.size));
392
393        // We should have at most a partial page.
394        let num_zx_bytes = *pos - *self.size;
395        assert!(num_zx_bytes < u32::from(page_size));
396
397        if num_zx_bytes == 0 {
398            return Ok(());
399        }
400
401        let mut page_buffer = self.page_allocator.alloc_page_buffer();
402
403        page_buffer[0..num_zx_bytes as usize].fill(0);
404
405        let num_data_len = buf
406            .len()
407            .min(usize::from(page_size) - num_zx_bytes as usize);
408        let buf_head = take_n(buf, num_data_len);
409        *pos += num_data_len as u32;
410
411        page_buffer[num_zx_bytes as usize..num_zx_bytes as usize + num_data_len]
412            .copy_from_slice(buf_head);
413
414        let end_of_data = num_zx_bytes as usize + num_data_len;
415        page_buffer[end_of_data..usize::from(page_size)].fill(0);
416
417        let page = self.page_allocator.alloc_page();
418        self.pages.push(page);
419        *self.size += end_of_data as u32;
420
421        self.file
422            .write_at(&page_buffer, page_to_offset(page, page_size))?;
423
424        Ok(())
425    }
426
427    /// Handles _most_ of overwrite.
428    ///
429    /// This handles:
430    /// * the unaligned page at the start of overwrite (if any)
431    /// * the complete, aligned pages in the middle of overwrite (if any)
432    ///
433    /// When this returns, even if buf still has data, we do not guarantee that pos == stream_size.
434    fn write_overwrite(&mut self, buf: &mut &[u8], pos: &mut u32) -> std::io::Result<()> {
435        assert!(*pos < *self.size);
436
437        self.write_overwrite_unaligned_start(buf, pos)?;
438        if buf.is_empty() {
439            return Ok(());
440        }
441
442        assert!(self.page_allocator.page_size.is_aligned(*pos));
443
444        self.write_overwrite_aligned_pages(buf, pos)?;
445        if buf.is_empty() {
446            return Ok(());
447        }
448
449        assert!(self.page_allocator.page_size.is_aligned(*pos));
450        assert!(self.page_allocator.page_size.is_aligned(*self.size));
451        Ok(())
452    }
453
454    /// Handles writing the first page during overwrite, if the first page is unaligned.
455    ///
456    /// # Requires
457    /// * `pos <= stream_size`
458    ///
459    /// # Ensures
460    /// * `buf.is_empty() || pos == stream_size`
461    fn write_overwrite_unaligned_start(
462        &mut self,
463        buf: &mut &[u8],
464        pos: &mut u32,
465    ) -> std::io::Result<()> {
466        assert!(*pos <= *self.size);
467
468        let page_size = self.page_allocator.page_size;
469
470        let pos_spage: StreamPage = *pos >> page_size.exponent();
471        let pos_phase = offset_within_page(*pos, page_size);
472        if pos_phase == 0 {
473            // The overwrite starts on an aligned page boundary. This function has no work to do.
474            return Ok(());
475        }
476
477        // In this case, we need to assemble a page from some old data and some new data.
478        // And because we need to cow the page, it is easier to reassemble everything into a
479        // page buffer.
480        //
481        // Contents of this page:
482        //
483        //
484        //
485        //        not empty            not empty             can be empty          can be empty
486        // |----old-data----------|------new-data--------|-------old-data------|-----garbage-----
487        // 0                      |                      |                     |
488        //                        |                      |                     |
489        //                  pos_phase
490        //
491        // At this point, we don't know which subcase we're in. It depends on whether new_data
492        // reaches the end of this page and whether writing the new-data extends the stream size.
493
494        // Read the old page data into our page buffer. This makes cow'ing the page easier.
495        // We read the entire page because that simplifies the case where the new-data ends
496        // before stream_size.
497        let mut page_buffer = self.page_allocator.alloc_page_buffer();
498        self.read_page(pos_spage, &mut page_buffer)?;
499
500        // Copy the data from 'buf' (new-data) into the page and advance 'buf' and 'pos'.
501        let new_data_len = (usize::from(page_size) - pos_phase as usize).min(buf.len());
502        assert!(new_data_len > 0);
503        let buf_head = take_n(buf, new_data_len);
504        page_buffer[pos_phase as usize..][..buf_head.len()].copy_from_slice(buf_head);
505        *pos += new_data_len as u32;
506
507        // Cow the page and write its new contents.
508        self.cow_page_and_write(pos_spage, &page_buffer)?;
509
510        // We may have written enough data that we extended stream_size. This only happens if we
511        // drag in some of the prefix of buf.
512        if *pos > *self.size {
513            *self.size = *pos;
514        }
515
516        Ok(())
517    }
518
519    /// If we are doing overwrite and the remaining buffer contains one or more whole pages,
520    /// then this function transfers those.
521    ///
522    /// This function may extend the stream size. If it does, then it will not extend it enough
523    /// to cross a page boundary. This form of extension happens when we are overwriting beyond
524    /// the existing end of the stream.
525    ///
526    /// This function will cow pages, but will not allocate new page slots in the stream.
527    ///
528    /// # Requires
529    /// * `pos` is page-aligned
530    /// * `pos <= stream_size`
531    ///
532    /// # Ensures
533    /// * `pos` is page-aligned
534    /// * `buf.is_empty() || stream_size is page-aligned`
535    fn write_overwrite_aligned_pages(
536        &mut self,
537        buf: &mut &[u8],
538        pos: &mut u32,
539    ) -> std::io::Result<()> {
540        let page_size = self.page_allocator.page_size;
541        assert!(*pos <= *self.size);
542        assert!(page_size.is_aligned(*pos));
543
544        if *pos == *self.size {
545            return Ok(());
546        }
547
548        // The stream page where this transfer will begin (if any).
549        let pos_spage = *pos / page_size;
550
551        // Number of complete pages that can be read from buf.
552        let num_buf_pages = buf.len() as u32 / page_size;
553
554        // Number of pages at our write position that are assigned to the stream.
555        // This includes the partial page at the end, if any.
556        let num_pages_total = self.size.div_round_up(page_size);
557        assert_eq!(num_pages_total, self.pages.len() as u32);
558        let num_pages_at_pos = *pos / page_size;
559        let num_pages_owned = num_pages_total - num_pages_at_pos;
560
561        // Number of complete pages we are going to transfer from buf to disk.
562        // Since we are writing whole pages, we do not need the old contents of any page.
563        // Cow the pages, just so we get fresh pages.
564        let num_xfer_pages = num_pages_owned.min(num_buf_pages);
565        if num_xfer_pages != 0 {
566            trace!(num_pages = num_xfer_pages, "writing whole pages");
567
568            let num_xfer_bytes = num_xfer_pages << page_size.exponent();
569            let buf_head = take_n(buf, num_xfer_bytes as usize);
570            *pos += num_xfer_bytes;
571
572            let pages = &mut self.pages[pos_spage as usize..][..num_xfer_pages as usize];
573            self.page_allocator.make_pages_fresh(pages);
574            write_runs(&self.file, buf_head, pages, page_size)?;
575
576            // If the last page that we overwrite was a partial page, then we will have extended
577            // the size of the stream. This will not extend stream_size beyond a page boundary.
578            if *pos > *self.size {
579                *self.size = *pos;
580            }
581
582            if buf.is_empty() {
583                return Ok(());
584            }
585        }
586
587        assert!(page_size.is_aligned(*pos));
588        assert!(*pos <= *self.size);
589
590        // We may have gotten here because buf.len() is now less than a full page size, but we still
591        // have another page assigned in the stream. Cow it now.
592        if *self.size - *pos > 0 {
593            trace!(buf_len = buf.len(), "buffer has partial page remaining.");
594            assert!(
595                buf.len() < usize::from(page_size),
596                "size = {:x}, pos = {:x}, buf.len = 0x{:x}",
597                *self.size,
598                *pos,
599                buf.len()
600            );
601
602            let spage = *pos / page_size;
603            let old_len = *self.size - *pos;
604
605            let mut page_buffer = self.page_allocator.alloc_page_buffer();
606            if old_len > buf.len() as u32 {
607                self.read_page(spage, &mut page_buffer)?;
608            }
609
610            page_buffer[..buf.len()].copy_from_slice(buf);
611
612            self.cow_page_and_write(spage, &page_buffer)?;
613            *pos += buf.len() as u32;
614            if *pos > *self.size {
615                *self.size = *pos;
616            }
617
618            *buf = &[];
619            return Ok(());
620        }
621
622        assert_eq!(*pos, *self.size);
623
624        Ok(())
625    }
626
627    /// Writes the unaligned start page at the beginning of the new-data. This page is only
628    /// present if `pos` is not aligned.
629    ///
630    /// # Requires
631    /// * `stream_size == pos`
632    fn write_unaligned_start_page(
633        &mut self,
634        buf: &mut &[u8],
635        pos: &mut u32,
636    ) -> std::io::Result<()> {
637        assert_eq!(*pos, *self.size);
638
639        // In this case, we need to assemble a page from some old data and some new data.
640        // And because we need to cow the page, it is easier to reassemble everything into a
641        // page buffer.
642        //
643        // Contents of this page:
644        //
645        //
646        //
647        //        not empty            not empty             can be empty          can be empty
648        // |----old-data----------|------new-data--------|-------old-data------|-----garbage-----
649        // 0                      |                      |                     |
650        //                        |                      |                     |
651        //                  pos_phase
652        //
653        // At this point, we don't know which subcase we're in. It depends on whether new_data
654        // reaches the end of this page and whether writing the new-data extends the stream size.
655
656        let page_size = self.page_allocator.page_size;
657
658        // where the new data begins
659        let pos_spage: StreamPage = *pos >> page_size.exponent();
660        let pos_phase = offset_within_page(*pos, page_size); // <-- we know this is non-zero
661
662        // Read the old page data into our page buffer. This makes cow'ing the page easier.
663        // We read the entire page because that simplifies the case where the new-data ends
664        // before stream_size.
665        let mut page_buffer = self.page_allocator.alloc_page_buffer();
666
667        let file_offset = page_to_offset(self.pages[pos_spage as usize], page_size);
668        trace!(
669            stream_pos = *pos,
670            file_offset,
671            len = u32::from(page_size),
672            "write_unaligned_start_page: reading existing unaligned data"
673        );
674
675        self.file.read_exact_at(&mut page_buffer, file_offset)?;
676
677        // Copy the data from 'buf' (new-data) into the page and advance 'buf' and 'pos'.
678        let new_data_len = (usize::from(page_size) - pos_phase as usize).min(buf.len());
679        assert!(new_data_len > 0);
680        let buf_head = take_n(buf, new_data_len);
681        page_buffer[pos_phase as usize..][..buf_head.len()].copy_from_slice(buf_head);
682        *pos += new_data_len as u32;
683
684        // Cow the page and write its new contents.
685        self.cow_page_and_write(pos_spage, &page_buffer)?;
686
687        // We may have written enough data that we extended stream_size.
688        if *pos > *self.size {
689            *self.size = *pos;
690        }
691
692        Ok(())
693    }
694
695    /// Ensures that a stream page is writable (is "fresh"). If the page is not writable, then it
696    /// allocates a new page. This function returns the page number of the writable page.
697    fn cow_page(&mut self, spage: StreamPage) -> Page {
698        self.page_allocator
699            .make_page_fresh(&mut self.pages[spage as usize])
700    }
701
702    /// Ensures that a stream page is writable and then writes it.
703    ///
704    /// `data` should contain at most one page of data.
705    pub(super) fn cow_page_and_write(
706        &mut self,
707        spage: StreamPage,
708        data: &[u8],
709    ) -> std::io::Result<()> {
710        // At most one full page of data can be written.
711        debug_assert!(
712            data.len() <= usize::from(self.page_allocator.page_size),
713            "buffer cannot exceed size of a single page"
714        );
715
716        let page = self.cow_page(spage);
717        let file_offset = page_to_offset(page, self.page_allocator.page_size);
718
719        trace!(
720            stream_page = spage,
721            file_offset,
722            len = u32::from(self.page_allocator.page_size),
723            "cow_page_and_write"
724        );
725
726        self.file.write_all_at(data, file_offset)
727    }
728
729    /// Reads a stream page.  The length of `data` must be exactly one page, or less. It cannot
730    /// cross page boundaries.
731    pub(super) fn read_page(
732        &self,
733        stream_page: StreamPage,
734        data: &mut [u8],
735    ) -> std::io::Result<()> {
736        debug_assert!(
737            data.len() <= usize::from(self.page_allocator.page_size),
738            "buffer cannot exceed size of a single page"
739        );
740
741        let page = self.pages[stream_page as usize];
742        let offset = page_to_offset(page, self.page_allocator.page_size);
743        self.file.read_exact_at(data, offset)
744    }
745
746    /// The caller **must** guarantee that this page is already writable.
747    pub(super) fn write_page(&self, stream_page: StreamPage, data: &[u8]) -> std::io::Result<()> {
748        let page = self.pages[stream_page as usize];
749        assert!(
750            self.page_allocator.fresh[page as usize],
751            "page is required to be fresh (writable)"
752        );
753
754        let file_offset = page_to_offset(page, self.page_allocator.page_size);
755
756        trace!(
757            stream_page,
758            page = page,
759            file_offset,
760            len = u32::from(self.page_allocator.page_size),
761            "write_page"
762        );
763
764        self.file.write_all_at(data, file_offset)
765    }
766}
767
768/// Finds the length of the prefix of pages in `pages` that are numbered sequentially.
769///
770/// This function assumes that there are no entries with the value 0xffff_ffff. (That would cause
771/// overflow.)
772fn find_longest_page_run(pages: &[Page]) -> usize {
773    if pages.is_empty() {
774        0
775    } else {
776        let mut prev = pages[0];
777        let mut i = 1;
778        while i < pages.len() && pages[i] == prev + 1 {
779            prev = pages[i];
780            i += 1;
781        }
782        i
783    }
784}
785
786#[test]
787fn test_find_longest_page_run() {
788    assert_eq!(find_longest_page_run(&[]), 0);
789    assert_eq!(find_longest_page_run(&[1]), 1);
790    assert_eq!(find_longest_page_run(&[1, 2, 3]), 3);
791    assert_eq!(find_longest_page_run(&[1, 3, 2]), 1);
792    assert_eq!(find_longest_page_run(&[1, 2, 3, 9, 9, 9]), 3);
793}
794
795/// Given a page map that corresponds to a buffer of data to write, write all of the data.
796/// Write it in a sequence of function calls that group together consecutive pages, so that
797/// we minimize the number of write() calls.
798fn write_runs<F: WriteAt>(
799    file: &F,
800    mut buf: &[u8],
801    pages: &[Page],
802    page_size: PageSize,
803) -> std::io::Result<()> {
804    let mut region_pages = pages;
805
806    assert_eq!(buf.len(), pages.len() << page_size.exponent());
807
808    loop {
809        let run_len = find_longest_page_run(region_pages);
810        if run_len == 0 {
811            break;
812        }
813
814        let page0: Page = region_pages[0];
815        let xfer_len: usize = run_len << page_size.exponent();
816        let buf_head = take_n(&mut buf, xfer_len);
817
818        // Write the run of pages. If this write fails, then the contents of the stream are
819        // now in an undefined state.
820        file.write_at(buf_head, page_to_offset(page0, page_size))?;
821
822        // Advance iterator state
823        region_pages = &region_pages[run_len..];
824    }
825
826    Ok(())
827}
828
829impl<F> Msf<F> {
830    /// Adds a new stream to the MSF file. The stream has a length of zero.
831    pub fn new_stream(&mut self) -> anyhow::Result<(u32, StreamWriter<'_, F>)> {
832        let _span = trace_span!("new_stream").entered();
833
834        self.requires_writeable()?;
835        self.check_can_add_stream()?;
836
837        let new_stream_index = self.stream_sizes.len() as u32;
838        trace!(new_stream_index);
839
840        self.stream_sizes.push(0);
841        let size = self.stream_sizes.last_mut().unwrap();
842
843        let pages = match self.modified_streams.entry(new_stream_index) {
844            Entry::Occupied(_) => {
845                panic!("Found entry in modified streams table that should not be present.")
846            }
847            Entry::Vacant(v) => v.insert(Vec::new()),
848        };
849
850        Ok((
851            new_stream_index,
852            StreamWriter {
853                stream: new_stream_index,
854                file: &self.file,
855                size,
856                page_allocator: &mut self.pages,
857                pos: 0,
858                pages,
859            },
860        ))
861    }
862
863    fn check_can_add_stream(&self) -> anyhow::Result<()> {
864        if self.stream_sizes.len() as u32 >= self.max_streams {
865            bail!("A new stream cannot be created because the maximum number of streams has been reached.");
866        }
867        Ok(())
868    }
869
870    /// Adds a new stream to the MSF file, given the byte contents. This function returns the
871    /// stream index of the new stream.
872    pub fn new_stream_data(&mut self, data: &[u8]) -> anyhow::Result<u32>
873    where
874        F: ReadAt + WriteAt,
875    {
876        let (stream_index, mut writer) = self.new_stream()?;
877        writer.set_contents(data)?;
878        Ok(stream_index)
879    }
880
881    /// Adds a new nil stream to the MSF file.
882    pub fn nil_stream(&mut self) -> anyhow::Result<u32> {
883        self.requires_writeable()?;
884        self.check_can_add_stream()?;
885
886        let new_stream_index = self.stream_sizes.len() as u32;
887        self.stream_sizes.push(NIL_STREAM_SIZE);
888
889        match self.modified_streams.entry(new_stream_index) {
890            Entry::Occupied(_) => {
891                panic!("Found entry in modified streams table that should be present.")
892            }
893            Entry::Vacant(v) => {
894                v.insert(Vec::new());
895            }
896        }
897
898        Ok(new_stream_index)
899    }
900
901    /// Given the stream index for a stream, returns a `StreamWriter` that allows read/write
902    /// for the stream.
903    ///
904    /// If `stream` is out of range for the current set of streams, then the set of streams is
905    /// increased until `stream` is in range. For example, if a new MSF file is created, then
906    /// it is legal to immediately call `msf.write_stream(10)` on it. This will expand the Stream
907    /// Directory so that `num_streams()` returns 11 (because it must include the new stream index).
908    /// All streams lower than `stream` will be allocated as nil streams.
909    ///
910    /// If `stream` is currently a nil stream, then this function promotes the stream to a
911    /// non-nil stream.
912    pub fn write_stream(&mut self, stream: u32) -> anyhow::Result<StreamWriter<'_, F>> {
913        assert!(stream <= MAX_STREAM);
914        self.requires_writeable()?;
915
916        while (self.stream_sizes.len() as u32) <= stream {
917            _ = self.nil_stream()?;
918        }
919
920        let Some(size) = self.stream_sizes.get_mut(stream as usize) else {
921            bail!("Stream index is out of range");
922        };
923
924        // If the stream is currently a nil stream, then promote it to a zero-length stream.
925        if *size == NIL_STREAM_SIZE {
926            *size = 0;
927        }
928
929        let pages = match self.modified_streams.entry(stream) {
930            Entry::Occupied(occ) => occ.into_mut(),
931            Entry::Vacant(v) => {
932                // Copy the existing page list to a new page list.
933                //
934                // Copying the page list _does not_ imply that we can safely write to those pages,
935                // because they may still be owned by the previous committed state. Copy-on-write
936                // is handled elsewhere.
937                let starts = &self.committed_stream_page_starts[stream as usize..];
938                let old_pages =
939                    &self.committed_stream_pages[starts[0] as usize..starts[1] as usize];
940                v.insert(old_pages.to_vec())
941            }
942        };
943
944        Ok(StreamWriter {
945            stream,
946            file: &self.file,
947            size,
948            page_allocator: &mut self.pages,
949            pos: 0,
950            pages,
951        })
952    }
953
954    pub(crate) fn requires_writeable(&self) -> anyhow::Result<()> {
955        match self.access_mode {
956            AccessMode::ReadWrite => Ok(()),
957            AccessMode::Read => bail!("This PDB was not opened for read/write access."),
958        }
959    }
960
961    /// Copies a stream from another PDB/MSF into this one.
962    pub fn copy_stream<Input: ReadAt>(
963        &mut self,
964        source: &Msf<Input>,
965        source_stream: u32,
966    ) -> anyhow::Result<u32>
967    where
968        F: ReadAt + WriteAt,
969    {
970        const BUFFER_LEN: usize = 16 << 20; // 16 MiB
971
972        let mut source_reader = source.get_stream_reader(source_stream)?;
973        let source_len = source_reader.len();
974
975        let mut buffer = vec![0; (source_len as usize).min(BUFFER_LEN)];
976        let (dest_stream_index, mut dest_writer) = self.new_stream()?;
977
978        loop {
979            let n = source_reader.read(&mut buffer)?;
980            if n == 0 {
981                break;
982            }
983
984            dest_writer.write_all(&buffer[..n])?;
985        }
986
987        Ok(dest_stream_index)
988    }
989
990    /// Copies a stream that implements `Read` into this PDB/MSF file.
991    pub fn copy_stream_read<Input: Read>(&mut self, source: &mut Input) -> anyhow::Result<u32>
992    where
993        F: ReadAt + WriteAt,
994    {
995        const BUFFER_LEN: usize = 16 << 20; // 16 MiB
996
997        let mut buffer = vec![0; BUFFER_LEN];
998        let (dest_stream_index, mut dest_writer) = self.new_stream()?;
999
1000        loop {
1001            let n = source.read(&mut buffer)?;
1002            if n == 0 {
1003                break;
1004            }
1005
1006            dest_writer.write_all(&buffer[..n])?;
1007        }
1008
1009        Ok(dest_stream_index)
1010    }
1011
1012    /// Copies a stream that implements `ReadAt` into this PDB/MSF file.
1013    pub fn copy_stream_read_at<Input: ReadAt>(&mut self, source: &Input) -> anyhow::Result<u32>
1014    where
1015        F: ReadAt + WriteAt,
1016    {
1017        const BUFFER_LEN: usize = 16 << 20; // 16 MiB
1018
1019        let mut buffer = vec![0; BUFFER_LEN];
1020        let (dest_stream_index, mut dest_writer) = self.new_stream()?;
1021
1022        let mut pos: u64 = 0;
1023
1024        loop {
1025            let n = source.read_at(&mut buffer, pos)?;
1026            if n == 0 {
1027                break;
1028            }
1029
1030            dest_writer.write_all(&buffer[..n])?;
1031            pos += n as u64;
1032        }
1033
1034        Ok(dest_stream_index)
1035    }
1036}
1037
1038/// Splits a slice `items` at a given index `n`. The slice is modified to point to the items
1039/// after `n`. The function returns the items up to `n`.
1040fn take_n<'a, T>(items: &mut &'a [T], n: usize) -> &'a [T] {
1041    let (lo, hi) = items.split_at(n);
1042    *items = hi;
1043    lo
1044}