ms_pdb_msf/commit.rs
1//! Commits all pending changes to an MSF file.
2
3use super::*;
4use anyhow::Result;
5use tracing::{debug, debug_span, info, info_span, trace, trace_span};
6
7impl<F: ReadAt + WriteAt> Msf<F> {
8 /// Commits all changes to the MSF file to disk.
9 ///
10 /// The MSF file format is designed to support a very limited form of transactional commits.
11 /// A single block write to the first page (located at the start of the file) can atomically
12 /// commit all outstanding modifications to the MSF file.
13 ///
14 /// This commit design does require that the underlying operating system handle the write to
15 /// page 0 in a single operation. Operating systems generally don't _contractually_
16 /// guarantee atomicity of such writes, but in practice it's true enough to be reasonably
17 /// reliable. The commit functionality of MSF is really designed to guard against application
18 /// failures, not failures of the operating system or its storage stack. So don't rely on
19 /// `commit()` for anything more than best-effort service.
20 ///
21 /// This `commit()` implementation _does not_ permit multiple concurrent writers (or concurrent
22 /// readers and writers) to the underlying MSF/PDB file. If you manage to circumvent this,
23 /// you may damage the underlying MSF/PDB file.
24 ///
25 /// This `commit()` implementation does not buffer modifications to stream contents. All
26 /// writes to stream contents are handled by immediately writing the data to the underlying
27 /// MSF file. However, these writes do not _overwrite_ the data stored in the stream; instead,
28 /// new pages are allocated in the file for the new or modified stream contents. If an
29 /// application deletes a range of a stream (by resizing it), then the existing pages are
30 /// protected and cannot be overwritten until `commit()` is called.
31 ///
32 /// Commit operations are moderately expensive. Applications that modify PDBs should generally
33 /// perform a single commit operation, not a series of commit operations. Each call to
34 /// `commit()` writes a new copy of the Stream Directory (the list of streams and their page
35 /// numbers) to disk. Because the existing Stream Directory cannot be overwritten (until
36 /// _after_ the `commit()` operation completes), this means that all `commit()` operations
37 /// require that two complete copies of the Stream Directory persist on disk.
38 ///
39 /// While it would be possible to reduce the size of an MSF file (after a commit completes) by
40 /// trimming the unused pages at the end of the MSF file, in practice this does not help much
41 /// because of page fragmentation. For this reason, `commit()` never reduces the size of the
42 /// underlying MSF file.
43 ///
44 /// Returns `Ok(true)` if this `Msf` contained uncommitted changes and these changes have now
45 /// been committed.
46 ///
47 /// Returns `Ok(false)` if this `Msf` did not contain any uncomitted changes. In this case,
48 /// no `write()` calls are issued to the underlying storage.
49 ///
50 /// If this function returns `Err`, then the underlying MSF file may have had new pages written,
51 /// but the existing Stream Directory and header page should be intact. However, if the
52 /// underlying operating system did not write Page 0 atomically, then the underlying MSF
53 /// file may be irrecoverably damaged.
54 ///
55 /// Also, if this function returns `Err`, the in-memory data structures that represent the
56 /// state of the `Msf` editor are not guaranteed to be in a consistent state.
57 pub fn commit(&mut self) -> Result<bool> {
58 let _span = info_span!("Msf::commit").entered();
59
60 self.assert_invariants();
61
62 // If this was not opened for write access then there are no pending changes at all.
63 if self.access_mode != AccessMode::ReadWrite {
64 info!("this Msf is not opened for read-write access");
65 debug_assert!(self.modified_streams.is_empty());
66 return Ok(false);
67 };
68
69 // We only support modifying Big MSF files.
70 assert_eq!(self.kind, MsfKind::Big);
71
72 // If no streams have been modified, then there is nothing to do.
73 if self.modified_streams.is_empty() {
74 info!("there are no modified streams; nothing to commit");
75 return Ok(false);
76 }
77
78 let new_fpm_number: u32 = match self.active_fpm {
79 FPM_NUMBER_1 => FPM_NUMBER_2,
80 FPM_NUMBER_2 => FPM_NUMBER_1,
81 _ => panic!("Active FPM has invalid value"),
82 };
83 info!(
84 old_fpm = self.active_fpm,
85 new_fpm = new_fpm_number,
86 "beginning commit"
87 );
88
89 let stream_dir_info = self.write_new_stream_dir()?;
90
91 // NOTE: The call to merge_freed_into_free irreversibly alters state. If we fail after
92 // this point, then Msf will be left in an inconsistent state. This could be improved by
93 // building a new FPM vector in-memory without modifying any state.
94 self.pages.merge_freed_into_free();
95 fill_last_word_of_fpm(&mut self.pages.fpm);
96
97 self.write_fpm(new_fpm_number)?;
98
99 let page_size = self.pages.page_size;
100 let page_size_usize = usize::from(page_size);
101
102 // Build the new Page 0.
103 let mut page0: Vec<u8> = vec![0; page_size_usize];
104
105 let msf_header = MsfHeader {
106 magic: MSF_BIG_MAGIC,
107 page_size: U32::new(u32::from(page_size)),
108 active_fpm: U32::new(new_fpm_number),
109 num_pages: U32::new(self.pages.num_pages),
110 stream_dir_size: U32::new(stream_dir_info.dir_size),
111 stream_dir_small_page_map: U32::new(0),
112 // The stream directory page map pointers follows the MsfHeader.
113 };
114 msf_header.write_to_prefix(page0.as_mut_slice()).unwrap();
115
116 // Copy the stream dir page map into Page 0.
117 let page_map_pages_bytes = stream_dir_info.map_pages.as_bytes();
118 page0[STREAM_DIR_PAGE_MAP_FILE_OFFSET as usize..][..page_map_pages_bytes.len()]
119 .copy_from_slice(page_map_pages_bytes);
120
121 // ------------------------ THE BIG COMMIT ----------------------
122
123 info!("writing MSF File Header");
124 self.file.write_all_at(page0.as_bytes(), 0)?;
125
126 // After this point, _nothing can fail_.
127 // Any operation that could have failed should have been moved above the commit point.
128
129 // --------------------- CLEANUP AFTER THE COMMIT ---------------
130
131 // Update in-memory state to reflect the commit.
132 //
133 // This code runs after we write the new Page 0 to disk. That commits the changes to the
134 // PDB. This function modifies in-memory state to reflect the successful commit. For this
135 // reason, after this point, this function must NEVER return a failure code.
136 {
137 // Build the new in-memory stream directory. This is very similar to the version that we
138 // just wrote to disk, so maybe we should unify the two.
139
140 let _span = trace_span!("post_commit").entered();
141
142 let page_size = self.pages.page_size;
143
144 // We can easily determine the right size for allocating 'stream_pages'.
145 let mut num_stream_pages: usize = 0;
146 for &stream_size in self.stream_sizes.iter() {
147 if stream_size != NIL_STREAM_SIZE {
148 num_stream_pages += num_pages_for_stream_size(stream_size, page_size) as usize;
149 }
150 }
151
152 let mut stream_pages: Vec<Page> = Vec::with_capacity(num_stream_pages);
153 let mut stream_page_starts: Vec<u32> = Vec::with_capacity(self.stream_sizes.len() + 1);
154
155 for (stream, &stream_size) in self.stream_sizes.iter().enumerate() {
156 stream_page_starts.push(stream_pages.len() as u32);
157
158 if stream_size == NIL_STREAM_SIZE {
159 trace!(stream, "stream is nil");
160 continue;
161 }
162
163 let num_stream_pages = num_pages_for_stream_size(stream_size, page_size) as usize;
164
165 // If this stream has been modified, then return the modified page list.
166 let is_modified;
167 let pages: &[Page] =
168 if let Some(pages) = self.modified_streams.get(&(stream as u32)) {
169 is_modified = true;
170 pages
171 } else {
172 is_modified = false;
173 let start = self.committed_stream_page_starts[stream] as usize;
174 &self.committed_stream_pages[start..start + num_stream_pages]
175 };
176 assert_eq!(num_stream_pages, pages.len());
177
178 trace!(stream, stream_size, num_stream_pages, is_modified);
179
180 stream_pages.extend_from_slice(pages);
181 }
182
183 stream_page_starts.push(stream_pages.len() as u32);
184
185 // Now that we have written the Stream Directory (and the map pages, above it), we
186 // need to mark the pages that contain the Stream Directory (and the map pages) as
187 // *freed*. Not free, but *freed*. Fortunately, we still have this information, since
188 // we built it above when we called write_new_stream_dir().
189 //
190 // The multiple_commits() and many_commits() tests verify this.
191 {
192 trace!("marking stream dir pages as free");
193
194 for list in [
195 stream_dir_info.dir_pages.as_slice(),
196 stream_dir_info.map_pages.as_slice(),
197 ] {
198 for &p in list.iter() {
199 let pi = p.get() as usize;
200 assert!(!self.pages.fpm_freed[pi]);
201 assert!(!self.pages.fpm[pi]);
202 self.pages.fpm_freed.set(pi, true);
203 }
204 }
205 }
206
207 // Update state
208 self.committed_stream_pages = stream_pages;
209 self.committed_stream_page_starts = stream_page_starts;
210 self.modified_streams.clear();
211
212 self.pages.fresh.set_elements(0);
213 self.pages.next_free_page_hint = 3; // positioned after file header and FPM1 and FPM2
214
215 trace!(new_fpm_number, "setting active FPM");
216 self.active_fpm = new_fpm_number;
217 }
218
219 info!("commit complete");
220
221 self.assert_invariants();
222
223 Ok(true)
224 }
225
226 /// Builds the new stream directory.
227 fn build_new_stream_dir(&self) -> Vec<U32<LE>> {
228 let page_size = self.pages.page_size;
229
230 let num_streams = self.stream_sizes.len();
231
232 let mut stream_dir: Vec<U32<LE>> = Vec::new();
233 stream_dir.push(U32::new(num_streams as u32));
234
235 // Push a size of 0 for Stream 0.
236 stream_dir.push(U32::new(0));
237
238 for &stream_size in self.stream_sizes[1..].iter() {
239 stream_dir.push(U32::new(stream_size));
240 }
241
242 for (stream, &stream_size) in self.stream_sizes.iter().enumerate() {
243 if stream_size == NIL_STREAM_SIZE {
244 debug!(stream, "stream is nil");
245 continue;
246 }
247
248 let num_stream_pages = num_pages_for_stream_size(stream_size, page_size) as usize;
249
250 // If this stream has been modified, then return the modified page list.
251 let pages: &[Page] = if let Some(pages) = self.modified_streams.get(&(stream as u32)) {
252 pages
253 } else {
254 let start = self.committed_stream_page_starts[stream] as usize;
255 &self.committed_stream_pages[start..start + num_stream_pages]
256 };
257 assert_eq!(num_stream_pages, pages.len());
258 debug!(stream, stream_size);
259
260 stream_dir.reserve(pages.len());
261 for &p in pages.iter() {
262 stream_dir.push(U32::new(p));
263 }
264 }
265
266 stream_dir
267 }
268
269 /// Builds the new stream directory and writes it to disk.
270 ///
271 /// This builds the stream directory and the page map pages and writes it to disk. It returns
272 /// the size in bytes of the stream directory and the page numbers of the page map.
273 fn write_new_stream_dir(&mut self) -> anyhow::Result<StreamDirInfo> {
274 let _span = debug_span!("Msf::write_new_stream_dir").entered();
275
276 let page_size = self.pages.page_size;
277 let page_size_usize = usize::from(page_size);
278
279 // "Dir" pages contain the contents of the Stream Directory.
280 // "Map" pages contain pointers to "dir" pages. They are "above" the dir pages.
281
282 let stream_dir = self.build_new_stream_dir();
283 let stream_dir_bytes = stream_dir.as_bytes();
284
285 let mut reusable_page_data: Vec<u8> = vec![0; usize::from(page_size)];
286
287 // The number of pages needed to store the Stream Directory.
288 let num_stream_dir_pages =
289 num_pages_for_stream_size(stream_dir_bytes.len() as u32, page_size) as usize;
290 let mut dir_pages: Vec<U32<LE>> = Vec::with_capacity(num_stream_dir_pages);
291
292 for stream_dir_chunk in stream_dir_bytes.chunks(page_size_usize) {
293 // Allocate a page for the next stream dir page.
294 let page = self.pages.alloc_page();
295 dir_pages.push(U32::new(page));
296
297 let page_bytes = if stream_dir_chunk.len() == page_size_usize {
298 // It's a complete page, so there is no need for the bounce buffer.
299 stream_dir_chunk
300 } else {
301 let (lo, hi) = reusable_page_data.split_at_mut(stream_dir_chunk.len());
302 lo.copy_from_slice(stream_dir_chunk);
303 hi.fill(0);
304 reusable_page_data.as_slice()
305 };
306
307 let page_offset = page_to_offset(page, page_size);
308 debug!(page, page_offset, "writing stream dir page");
309 self.file.write_all_at(page_bytes, page_offset)?;
310 }
311
312 // Now we build the next level of indirection (the "page map"), and allocate pages for them
313 // and write them.
314 let mut map_pages: Vec<U32<LE>> = Vec::new();
315
316 let num_u32s_per_page = u32::from(page_size) / 4;
317 for map_page_contents in dir_pages.chunks(num_u32s_per_page as usize) {
318 let map_page_index = self.pages.alloc_page();
319 let map_file_offset = page_to_offset(map_page_index, page_size);
320 let map_page_bytes = map_page_contents.as_bytes();
321 let (lo, hi) = reusable_page_data.split_at_mut(map_page_bytes.len());
322 lo.copy_from_slice(map_page_bytes);
323 hi.fill(0);
324
325 debug!(
326 map_page_index,
327 map_file_offset, "writing stream dir page map page"
328 );
329 self.file
330 .write_all_at(&reusable_page_data, map_file_offset)?;
331
332 map_pages.push(U32::new(map_page_index));
333 }
334
335 Ok(StreamDirInfo {
336 dir_size: stream_dir_bytes.len() as u32,
337 dir_pages,
338 map_pages,
339 })
340 }
341
342 /// Writes the FPM for the new transaction state.
343 fn write_fpm(&mut self, new_fpm_number: u32) -> anyhow::Result<()> {
344 let _span = debug_span!("write_fpm").entered();
345
346 let page_size = self.pages.page_size;
347 let page_size_usize = usize::from(page_size);
348 let num_intervals = self.pages.num_pages.div_round_up(page_size);
349
350 assert_eq!(self.pages.num_pages as usize, self.pages.fpm.len());
351 let fpm_words: &[u32] = self.pages.fpm.as_raw_slice();
352 let fpm_bytes: &[u8] = fpm_words.as_bytes();
353
354 // This iterates the contents of the pages of the FPM. Each item iterated is a &[u8]
355 // containing the piece of the FPM that should be written to a single on-disk page.
356 // The last page iterated can be a partial (incomplete) page.
357 //
358 // For example: page_size = 4096, so there are 4096 bytes in each FPM page within
359 // an interval. That means there are 4096 * 8 bits in each FPM page, or 32,768 bits.
360 // These bits cover _much_ more than a single interval; each FPM page covers 8
361 // intervals worth of pages.
362 //
363 // This is basically a bug in the design of the FPM; the FPM is 8x larger than it
364 // needs to be. But the design is frozen, so we must do it this way.
365
366 let mut fpm_pages_data_iter = fpm_bytes.chunks(page_size_usize);
367
368 // This is a buffer where we assemble complete FPM pages before writing them to disk.
369 // This ensures that we always write a complete page. This is more efficient for storage
370 // stacks, since pages are usually larger than on-disk block sizes and are block-size
371 // aligned, so this avoids the need for a read-modify-write cycle in the underlying
372 // filesystem. This is only necessary for the last (partial) page.
373 let mut fpm_page_buffer: Vec<u8> = vec![0; page_size_usize];
374
375 for interval_index in 0..num_intervals {
376 let this_fpm_page_data = fpm_pages_data_iter.next().unwrap_or(&[]);
377 assert!(this_fpm_page_data.len() <= fpm_page_buffer.len());
378
379 let slice_to_write = if this_fpm_page_data.len() < page_size_usize {
380 fpm_page_buffer[..this_fpm_page_data.len()].copy_from_slice(this_fpm_page_data);
381 fpm_page_buffer[this_fpm_page_data.len()..].fill(0xff); // fill the rest with "free"
382 fpm_page_buffer.as_slice()
383 } else {
384 // We already have a perfectly-sized slice. Just use it.
385 this_fpm_page_data
386 };
387
388 let interval_page = interval_to_page(interval_index, page_size);
389 let new_fpm_page = interval_page + new_fpm_number;
390
391 debug!(interval = interval_index, "writing fpm chunk");
392
393 self.file
394 .write_all_at(slice_to_write, page_to_offset(new_fpm_page, page_size))?;
395 }
396
397 Ok(())
398 }
399}
400
401/// This ensures that the last few bits of the FPM are set to "free".
402///
403/// The MSPDB library uses a bit vector implementation that packs bits into an array of `u32`
404/// values, just as this Rust implementation does. However, if the number of _bits_ in the FPM
405/// is not a multiple of 32, then the MSPDB library accidentally reads the unaligned bits in the
406/// last `u32` and expects them to be "free".
407fn fill_last_word_of_fpm(fpm: &mut BitVec<u32, Lsb0>) {
408 let unaligned_len = fpm.len() & 0x1f;
409 if unaligned_len == 0 {
410 return;
411 }
412
413 let fpm_words = fpm.as_raw_mut_slice();
414 let last = fpm_words.last_mut().unwrap();
415
416 // Because unaligned_len is the result of masking with 0x1f, we know that the shift count
417 // cannot overflow.
418 *last |= 0xffff_ffff << unaligned_len;
419}
420
421/// Information about the new Stream Directory that we just constructed and wrote to disk.
422struct StreamDirInfo {
423 /// Size in bytes of the Stream Directory
424 dir_size: u32,
425
426 /// The list of pages that contain the Stream Directory
427 dir_pages: Vec<U32<LE>>,
428
429 /// The list of pages that contain the Stream Directory Map (the level _above_ `dir_pages`)
430 map_pages: Vec<U32<LE>>,
431}