ms_pdb_msf/
commit.rs

1//! Commits all pending changes to an MSF file.
2
3use super::*;
4use anyhow::Result;
5use tracing::{debug, debug_span, info, info_span, trace, trace_span};
6
7impl<F: ReadAt + WriteAt> Msf<F> {
8    /// Commits all changes to the MSF file to disk.
9    ///
10    /// The MSF file format is designed to support a very limited form of transactional commits.
11    /// A single block write to the first page (located at the start of the file) can atomically
12    /// commit all outstanding modifications to the MSF file.
13    ///
14    /// This commit design does require that the underlying operating system handle the write to
15    /// page 0 in a single operation.  Operating systems generally don't _contractually_
16    /// guarantee atomicity of such writes, but in practice it's true enough to be reasonably
17    /// reliable. The commit functionality of MSF is really designed to guard against application
18    /// failures, not failures of the operating system or its storage stack. So don't rely on
19    /// `commit()` for anything more than best-effort service.
20    ///
21    /// This `commit()` implementation _does not_ permit multiple concurrent writers (or concurrent
22    /// readers and writers) to the underlying MSF/PDB file. If you manage to circumvent this,
23    /// you may damage the underlying MSF/PDB file.
24    ///
25    /// This `commit()` implementation does not buffer modifications to stream contents. All
26    /// writes to stream contents are handled by immediately writing the data to the underlying
27    /// MSF file. However, these writes do not _overwrite_ the data stored in the stream; instead,
28    /// new pages are allocated in the file for the new or modified stream contents. If an
29    /// application deletes a range of a stream (by resizing it), then the existing pages are
30    /// protected and cannot be overwritten until `commit()` is called.
31    ///
32    /// Commit operations are moderately expensive. Applications that modify PDBs should generally
33    /// perform a single commit operation, not a series of commit operations. Each call to
34    /// `commit()` writes a new copy of the Stream Directory (the list of streams and their page
35    /// numbers) to disk. Because the existing Stream Directory cannot be overwritten (until
36    /// _after_ the `commit()` operation completes), this means that all `commit()` operations
37    /// require that two complete copies of the Stream Directory persist on disk.
38    ///
39    /// While it would be possible to reduce the size of an MSF file (after a commit completes) by
40    /// trimming the unused pages at the end of the MSF file, in practice this does not help much
41    /// because of page fragmentation. For this reason, `commit()` never reduces the size of the
42    /// underlying MSF file.
43    ///
44    /// Returns `Ok(true)` if this `Msf` contained uncommitted changes and these changes have now
45    /// been committed.
46    ///
47    /// Returns `Ok(false)` if this `Msf` did not contain any uncomitted changes. In this case,
48    /// no `write()` calls are issued to the underlying storage.
49    ///
50    /// If this function returns `Err`, then the underlying MSF file may have had new pages written,
51    /// but the existing Stream Directory and header page should be intact. However, if the
52    /// underlying operating system did not write Page 0 atomically, then the underlying MSF
53    /// file may be irrecoverably damaged.
54    ///
55    /// Also, if this function returns `Err`, the in-memory data structures that represent the
56    /// state of the `Msf` editor are not guaranteed to be in a consistent state.
57    pub fn commit(&mut self) -> Result<bool> {
58        let _span = info_span!("Msf::commit").entered();
59
60        self.assert_invariants();
61
62        // If this was not opened for write access then there are no pending changes at all.
63        if self.access_mode != AccessMode::ReadWrite {
64            info!("this Msf is not opened for read-write access");
65            debug_assert!(self.modified_streams.is_empty());
66            return Ok(false);
67        };
68
69        // We only support modifying Big MSF files.
70        assert_eq!(self.kind, MsfKind::Big);
71
72        // If no streams have been modified, then there is nothing to do.
73        if self.modified_streams.is_empty() {
74            info!("there are no modified streams; nothing to commit");
75            return Ok(false);
76        }
77
78        let new_fpm_number: u32 = match self.active_fpm {
79            FPM_NUMBER_1 => FPM_NUMBER_2,
80            FPM_NUMBER_2 => FPM_NUMBER_1,
81            _ => panic!("Active FPM has invalid value"),
82        };
83        info!(
84            old_fpm = self.active_fpm,
85            new_fpm = new_fpm_number,
86            "beginning commit"
87        );
88
89        let stream_dir_info = self.write_new_stream_dir()?;
90
91        // NOTE: The call to merge_freed_into_free irreversibly alters state. If we fail after
92        // this point, then Msf will be left in an inconsistent state. This could be improved by
93        // building a new FPM vector in-memory without modifying any state.
94        self.pages.merge_freed_into_free();
95        fill_last_word_of_fpm(&mut self.pages.fpm);
96
97        self.write_fpm(new_fpm_number)?;
98
99        let page_size = self.pages.page_size;
100        let page_size_usize = usize::from(page_size);
101
102        // Build the new Page 0.
103        let mut page0: Vec<u8> = vec![0; page_size_usize];
104
105        let msf_header = MsfHeader {
106            magic: MSF_BIG_MAGIC,
107            page_size: U32::new(u32::from(page_size)),
108            active_fpm: U32::new(new_fpm_number),
109            num_pages: U32::new(self.pages.num_pages),
110            stream_dir_size: U32::new(stream_dir_info.dir_size),
111            stream_dir_small_page_map: U32::new(0),
112            // The stream directory page map pointers follows the MsfHeader.
113        };
114        msf_header.write_to_prefix(page0.as_mut_slice()).unwrap();
115
116        // Copy the stream dir page map into Page 0.
117        let page_map_pages_bytes = stream_dir_info.map_pages.as_bytes();
118        page0[STREAM_DIR_PAGE_MAP_FILE_OFFSET as usize..][..page_map_pages_bytes.len()]
119            .copy_from_slice(page_map_pages_bytes);
120
121        // ------------------------ THE BIG COMMIT ----------------------
122
123        info!("writing MSF File Header");
124        self.file.write_all_at(page0.as_bytes(), 0)?;
125
126        // After this point, _nothing can fail_.
127        // Any operation that could have failed should have been moved above the commit point.
128
129        // --------------------- CLEANUP AFTER THE COMMIT ---------------
130
131        // Update in-memory state to reflect the commit.
132        //
133        // This code runs after we write the new Page 0 to disk. That commits the changes to the
134        // PDB. This function modifies in-memory state to reflect the successful commit. For this
135        // reason, after this point, this function must NEVER return a failure code.
136        {
137            // Build the new in-memory stream directory. This is very similar to the version that we
138            // just wrote to disk, so maybe we should unify the two.
139
140            let _span = trace_span!("post_commit").entered();
141
142            let page_size = self.pages.page_size;
143
144            // We can easily determine the right size for allocating 'stream_pages'.
145            let mut num_stream_pages: usize = 0;
146            for &stream_size in self.stream_sizes.iter() {
147                if stream_size != NIL_STREAM_SIZE {
148                    num_stream_pages += num_pages_for_stream_size(stream_size, page_size) as usize;
149                }
150            }
151
152            let mut stream_pages: Vec<Page> = Vec::with_capacity(num_stream_pages);
153            let mut stream_page_starts: Vec<u32> = Vec::with_capacity(self.stream_sizes.len() + 1);
154
155            for (stream, &stream_size) in self.stream_sizes.iter().enumerate() {
156                stream_page_starts.push(stream_pages.len() as u32);
157
158                if stream_size == NIL_STREAM_SIZE {
159                    trace!(stream, "stream is nil");
160                    continue;
161                }
162
163                let num_stream_pages = num_pages_for_stream_size(stream_size, page_size) as usize;
164
165                // If this stream has been modified, then return the modified page list.
166                let is_modified;
167                let pages: &[Page] =
168                    if let Some(pages) = self.modified_streams.get(&(stream as u32)) {
169                        is_modified = true;
170                        pages
171                    } else {
172                        is_modified = false;
173                        let start = self.committed_stream_page_starts[stream] as usize;
174                        &self.committed_stream_pages[start..start + num_stream_pages]
175                    };
176                assert_eq!(num_stream_pages, pages.len());
177
178                trace!(stream, stream_size, num_stream_pages, is_modified);
179
180                stream_pages.extend_from_slice(pages);
181            }
182
183            stream_page_starts.push(stream_pages.len() as u32);
184
185            // Now that we have written the Stream Directory (and the map pages, above it), we
186            // need to mark the pages that contain the Stream Directory (and the map pages) as
187            // *freed*.  Not free, but *freed*.  Fortunately, we still have this information, since
188            // we built it above when we called write_new_stream_dir().
189            //
190            // The multiple_commits() and many_commits() tests verify this.
191            {
192                trace!("marking stream dir pages as free");
193
194                for list in [
195                    stream_dir_info.dir_pages.as_slice(),
196                    stream_dir_info.map_pages.as_slice(),
197                ] {
198                    for &p in list.iter() {
199                        let pi = p.get() as usize;
200                        assert!(!self.pages.fpm_freed[pi]);
201                        assert!(!self.pages.fpm[pi]);
202                        self.pages.fpm_freed.set(pi, true);
203                    }
204                }
205            }
206
207            // Update state
208            self.committed_stream_pages = stream_pages;
209            self.committed_stream_page_starts = stream_page_starts;
210            self.modified_streams.clear();
211
212            self.pages.fresh.set_elements(0);
213            self.pages.next_free_page_hint = 3; // positioned after file header and FPM1 and FPM2
214
215            trace!(new_fpm_number, "setting active FPM");
216            self.active_fpm = new_fpm_number;
217        }
218
219        info!("commit complete");
220
221        self.assert_invariants();
222
223        Ok(true)
224    }
225
226    /// Builds the new stream directory.
227    fn build_new_stream_dir(&self) -> Vec<U32<LE>> {
228        let page_size = self.pages.page_size;
229
230        let num_streams = self.stream_sizes.len();
231
232        let mut stream_dir: Vec<U32<LE>> = Vec::new();
233        stream_dir.push(U32::new(num_streams as u32));
234
235        // Push a size of 0 for Stream 0.
236        stream_dir.push(U32::new(0));
237
238        for &stream_size in self.stream_sizes[1..].iter() {
239            stream_dir.push(U32::new(stream_size));
240        }
241
242        for (stream, &stream_size) in self.stream_sizes.iter().enumerate() {
243            if stream_size == NIL_STREAM_SIZE {
244                debug!(stream, "stream is nil");
245                continue;
246            }
247
248            let num_stream_pages = num_pages_for_stream_size(stream_size, page_size) as usize;
249
250            // If this stream has been modified, then return the modified page list.
251            let pages: &[Page] = if let Some(pages) = self.modified_streams.get(&(stream as u32)) {
252                pages
253            } else {
254                let start = self.committed_stream_page_starts[stream] as usize;
255                &self.committed_stream_pages[start..start + num_stream_pages]
256            };
257            assert_eq!(num_stream_pages, pages.len());
258            debug!(stream, stream_size);
259
260            stream_dir.reserve(pages.len());
261            for &p in pages.iter() {
262                stream_dir.push(U32::new(p));
263            }
264        }
265
266        stream_dir
267    }
268
269    /// Builds the new stream directory and writes it to disk.
270    ///
271    /// This builds the stream directory and the page map pages and writes it to disk. It returns
272    /// the size in bytes of the stream directory and the page numbers of the page map.
273    fn write_new_stream_dir(&mut self) -> anyhow::Result<StreamDirInfo> {
274        let _span = debug_span!("Msf::write_new_stream_dir").entered();
275
276        let page_size = self.pages.page_size;
277        let page_size_usize = usize::from(page_size);
278
279        // "Dir" pages contain the contents of the Stream Directory.
280        // "Map" pages contain pointers to "dir" pages. They are "above" the dir pages.
281
282        let stream_dir = self.build_new_stream_dir();
283        let stream_dir_bytes = stream_dir.as_bytes();
284
285        let mut reusable_page_data: Vec<u8> = vec![0; usize::from(page_size)];
286
287        // The number of pages needed to store the Stream Directory.
288        let num_stream_dir_pages =
289            num_pages_for_stream_size(stream_dir_bytes.len() as u32, page_size) as usize;
290        let mut dir_pages: Vec<U32<LE>> = Vec::with_capacity(num_stream_dir_pages);
291
292        for stream_dir_chunk in stream_dir_bytes.chunks(page_size_usize) {
293            // Allocate a page for the next stream dir page.
294            let page = self.pages.alloc_page();
295            dir_pages.push(U32::new(page));
296
297            let page_bytes = if stream_dir_chunk.len() == page_size_usize {
298                // It's a complete page, so there is no need for the bounce buffer.
299                stream_dir_chunk
300            } else {
301                let (lo, hi) = reusable_page_data.split_at_mut(stream_dir_chunk.len());
302                lo.copy_from_slice(stream_dir_chunk);
303                hi.fill(0);
304                reusable_page_data.as_slice()
305            };
306
307            let page_offset = page_to_offset(page, page_size);
308            debug!(page, page_offset, "writing stream dir page");
309            self.file.write_all_at(page_bytes, page_offset)?;
310        }
311
312        // Now we build the next level of indirection (the "page map"), and allocate pages for them
313        // and write them.
314        let mut map_pages: Vec<U32<LE>> = Vec::new();
315
316        let num_u32s_per_page = u32::from(page_size) / 4;
317        for map_page_contents in dir_pages.chunks(num_u32s_per_page as usize) {
318            let map_page_index = self.pages.alloc_page();
319            let map_file_offset = page_to_offset(map_page_index, page_size);
320            let map_page_bytes = map_page_contents.as_bytes();
321            let (lo, hi) = reusable_page_data.split_at_mut(map_page_bytes.len());
322            lo.copy_from_slice(map_page_bytes);
323            hi.fill(0);
324
325            debug!(
326                map_page_index,
327                map_file_offset, "writing stream dir page map page"
328            );
329            self.file
330                .write_all_at(&reusable_page_data, map_file_offset)?;
331
332            map_pages.push(U32::new(map_page_index));
333        }
334
335        Ok(StreamDirInfo {
336            dir_size: stream_dir_bytes.len() as u32,
337            dir_pages,
338            map_pages,
339        })
340    }
341
342    /// Writes the FPM for the new transaction state.
343    fn write_fpm(&mut self, new_fpm_number: u32) -> anyhow::Result<()> {
344        let _span = debug_span!("write_fpm").entered();
345
346        let page_size = self.pages.page_size;
347        let page_size_usize = usize::from(page_size);
348        let num_intervals = self.pages.num_pages.div_round_up(page_size);
349
350        assert_eq!(self.pages.num_pages as usize, self.pages.fpm.len());
351        let fpm_words: &[u32] = self.pages.fpm.as_raw_slice();
352        let fpm_bytes: &[u8] = fpm_words.as_bytes();
353
354        // This iterates the contents of the pages of the FPM. Each item iterated is a &[u8]
355        // containing the piece of the FPM that should be written to a single on-disk page.
356        // The last page iterated can be a partial (incomplete) page.
357        //
358        // For example: page_size = 4096, so there are 4096 bytes in each FPM page within
359        // an interval.  That means there are 4096 * 8 bits in each FPM page, or 32,768 bits.
360        // These bits cover _much_ more than a single interval; each FPM page covers 8
361        // intervals worth of pages.
362        //
363        // This is basically a bug in the design of the FPM; the FPM is 8x larger than it
364        // needs to be. But the design is frozen, so we must do it this way.
365
366        let mut fpm_pages_data_iter = fpm_bytes.chunks(page_size_usize);
367
368        // This is a buffer where we assemble complete FPM pages before writing them to disk.
369        // This ensures that we always write a complete page. This is more efficient for storage
370        // stacks, since pages are usually larger than on-disk block sizes and are block-size
371        // aligned, so this avoids the need for a read-modify-write cycle in the underlying
372        // filesystem. This is only necessary for the last (partial) page.
373        let mut fpm_page_buffer: Vec<u8> = vec![0; page_size_usize];
374
375        for interval_index in 0..num_intervals {
376            let this_fpm_page_data = fpm_pages_data_iter.next().unwrap_or(&[]);
377            assert!(this_fpm_page_data.len() <= fpm_page_buffer.len());
378
379            let slice_to_write = if this_fpm_page_data.len() < page_size_usize {
380                fpm_page_buffer[..this_fpm_page_data.len()].copy_from_slice(this_fpm_page_data);
381                fpm_page_buffer[this_fpm_page_data.len()..].fill(0xff); // fill the rest with "free"
382                fpm_page_buffer.as_slice()
383            } else {
384                // We already have a perfectly-sized slice. Just use it.
385                this_fpm_page_data
386            };
387
388            let interval_page = interval_to_page(interval_index, page_size);
389            let new_fpm_page = interval_page + new_fpm_number;
390
391            debug!(interval = interval_index, "writing fpm chunk");
392
393            self.file
394                .write_all_at(slice_to_write, page_to_offset(new_fpm_page, page_size))?;
395        }
396
397        Ok(())
398    }
399}
400
401/// This ensures that the last few bits of the FPM are set to "free".
402///
403/// The MSPDB library uses a bit vector implementation that packs bits into an array of `u32`
404/// values, just as this Rust implementation does. However, if the number of _bits_ in the FPM
405/// is not a multiple of 32, then the MSPDB library accidentally reads the unaligned bits in the
406/// last `u32` and expects them to be "free".
407fn fill_last_word_of_fpm(fpm: &mut BitVec<u32, Lsb0>) {
408    let unaligned_len = fpm.len() & 0x1f;
409    if unaligned_len == 0 {
410        return;
411    }
412
413    let fpm_words = fpm.as_raw_mut_slice();
414    let last = fpm_words.last_mut().unwrap();
415
416    // Because unaligned_len is the result of masking with 0x1f, we know that the shift count
417    // cannot overflow.
418    *last |= 0xffff_ffff << unaligned_len;
419}
420
421/// Information about the new Stream Directory that we just constructed and wrote to disk.
422struct StreamDirInfo {
423    /// Size in bytes of the Stream Directory
424    dir_size: u32,
425
426    /// The list of pages that contain the Stream Directory
427    dir_pages: Vec<U32<LE>>,
428
429    /// The list of pages that contain the Stream Directory Map (the level _above_ `dir_pages`)
430    map_pages: Vec<U32<LE>>,
431}