1use std::io::{Read, Seek, SeekFrom, Write};
4use std::ops::Deref;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::{Arc, Mutex, MutexGuard, RwLock};
7
8use crate::constants::{
9 BAT_REGION_GUID, HEADER_BUFFER_SIZE, KNOWN_METADATA_GUIDS, KNOWN_REGION_GUIDS,
10 METADATA_REGION_GUID, MIB,
11};
12use crate::error::{Error, Result};
13use crate::header::Header;
14use crate::log_replay::ReplayOverlay;
15use crate::section::Sections;
16use crate::types::Guid;
17
18use super::{CreateOptions, LogReplayPolicy, OpenOptions, ParentResolver, ReadOnly};
19
20pub(crate) fn read_exact_at<T>(inner: &mut T, offset: u64, buf: &mut [u8]) -> std::io::Result<()>
21where
22 T: Read + Seek,
23{
24 inner.seek(SeekFrom::Start(offset))?;
25 inner.read_exact(buf)
26}
27
28pub(crate) fn write_all_at<T>(inner: &mut T, offset: u64, buf: &[u8]) -> std::io::Result<()>
29where
30 T: Write + Seek,
31{
32 inner.seek(SeekFrom::Start(offset))?;
33 inner.write_all(buf)
34}
35
36pub(crate) fn is_known_region_guid(guid: &Guid) -> bool {
45 KNOWN_REGION_GUIDS.contains(guid)
46}
47
48pub(crate) fn is_known_metadata_guid(guid: &Guid) -> bool {
50 KNOWN_METADATA_GUIDS.contains(guid)
51}
52
53pub struct Medium<T = std::fs::File> {
62 pub(super) inner: Mutex<T>,
63 pub(super) header_buf: RwLock<Option<CacheEntry>>,
65 pub(super) bat_buf: RwLock<Option<CacheEntry>>,
67 pub(super) metadata_buf: RwLock<Option<CacheEntry>>,
69 pub(super) log_buf: RwLock<Option<CacheEntry>>,
71 pub(super) generation: AtomicU64,
73 pub(super) write: bool,
75 pub(super) strict: bool,
77 pub(super) log_replay_policy: LogReplayPolicy,
79 pub(super) replay_overlay: Option<Arc<ReplayOverlay>>,
81 pub(crate) parent_resolver: Mutex<Option<Box<dyn ParentResolver + Send>>>,
83 pub(super) validator_buf: RwLock<Option<CacheEntry>>,
85}
86
87#[derive(Clone)]
88pub(crate) struct CacheEntry {
89 pub(super) generation: u64,
90 pub(super) bytes: Arc<[u8]>,
91}
92
93impl CacheEntry {
94 pub(super) fn new(generation: u64, bytes: Arc<[u8]>) -> Self {
95 Self { generation, bytes }
96 }
97
98 fn valid_bytes(&self, generation: u64) -> Option<Arc<[u8]>> {
99 (self.generation == generation).then(|| Arc::clone(&self.bytes))
100 }
101}
102
103pub struct InnerRef<'a, T> {
105 guard: MutexGuard<'a, T>,
106}
107
108impl<T> Deref for InnerRef<'_, T> {
109 type Target = T;
110
111 fn deref(&self) -> &Self::Target {
112 &self.guard
113 }
114}
115
116impl<T> std::fmt::Debug for Medium<T> {
117 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118 f.debug_struct("Medium")
119 .field("write", &self.write)
120 .field("strict", &self.strict)
121 .field("log_replay_policy", &self.log_replay_policy)
122 .finish_non_exhaustive()
123 }
124}
125
126impl<T> Medium<T> {
127 pub fn get_ref(&self) -> InnerRef<'_, T> {
133 InnerRef {
134 guard: self.inner.lock().expect("medium inner lock poisoned"),
135 }
136 }
137
138 pub fn get_mut(&mut self) -> &mut T {
150 self.invalidate_all_caches();
151 self.inner.get_mut().expect("medium inner lock poisoned")
152 }
153
154 pub(crate) fn inner_mut(&mut self) -> &mut T {
155 self.inner.get_mut().expect("medium inner lock poisoned")
156 }
157
158 fn current_generation(&self) -> u64 {
159 self.generation.load(Ordering::Acquire)
160 }
161
162 fn bump_generation(&self) -> u64 {
163 self.generation.fetch_add(1, Ordering::AcqRel) + 1
164 }
165
166 fn invalidate_all_caches(&self) {
167 self.bump_generation();
168 if let Ok(mut cache) = self.header_buf.write() {
169 *cache = None;
170 }
171 if let Ok(mut cache) = self.bat_buf.write() {
172 *cache = None;
173 }
174 if let Ok(mut cache) = self.metadata_buf.write() {
175 *cache = None;
176 }
177 if let Ok(mut cache) = self.log_buf.write() {
178 *cache = None;
179 }
180 if let Ok(mut cache) = self.validator_buf.write() {
181 *cache = None;
182 }
183 }
184
185 pub fn into_inner(self) -> T {
187 self.inner
188 .into_inner()
189 .unwrap_or_else(std::sync::PoisonError::into_inner)
190 }
191
192 pub(crate) fn is_write(&self) -> bool {
194 self.write
195 }
196
197 pub(crate) fn replay_overlay_arc(&self) -> Option<&Arc<ReplayOverlay>> {
199 self.replay_overlay.as_ref()
200 }
201
202 pub fn create(inner: T) -> CreateOptions<T> {
204 CreateOptions {
205 inner: Some(inner),
206 virtual_size: 0,
207 fixed: false,
208 block_size: 32 * 1024 * 1024,
209 logical_sector_size: 4096,
210 physical_sector_size: 4096,
211 parent: None,
212 }
213 }
214
215 pub fn open(inner: T) -> OpenOptions<T, ReadOnly> {
220 OpenOptions {
221 inner,
222 strict: true,
223 log_replay_policy: LogReplayPolicy::Require,
224 parent_resolver: None,
225 _mode: std::marker::PhantomData,
226 }
227 }
228}
229
230impl<T> Medium<T>
231where
232 T: Read + Seek,
233{
234 pub fn io(&mut self) -> Result<crate::io::IO<'_, T>> {
240 crate::io::IO::new(self)
241 }
242}
243
244impl<T> Medium<T>
245where
246 T: Read + Write + Seek,
247{
248 pub(crate) fn write_bat_entry(&mut self, bat_array_idx: u64, raw_entry: [u8; 8]) -> Result<()> {
249 let header_buf = self.header_buf_arc()?;
250 let header = Header::new(&header_buf)?;
251 let rt = header.region_table(0)?;
252 let bat_region = rt
253 .entries()
254 .find(|entry| entry.guid() == BAT_REGION_GUID)
255 .ok_or_else(|| Error::InvalidFile("BAT region not found in region table".into()))?;
256 let entry_offset = bat_array_idx
257 .checked_mul(8)
258 .and_then(|offset| bat_region.file_offset().checked_add(offset))
259 .ok_or_else(|| Error::InvalidParameter("BAT entry offset overflow".into()))?;
260
261 write_all_at(self.inner_mut(), entry_offset, &raw_entry)?;
262 let generation = self.bump_generation();
263
264 let mut bat_cache = self
265 .bat_buf
266 .write()
267 .map_err(|_| Error::InvalidFile("BAT cache lock poisoned".into()))?;
268 if let Some(entry) = bat_cache.as_ref() {
269 let cache_offset = usize::try_from(bat_array_idx)
270 .map_err(|_| Error::InvalidParameter("BAT index does not fit usize".into()))?
271 .checked_mul(8)
272 .ok_or_else(|| Error::InvalidParameter("BAT cache offset overflow".into()))?;
273 let cache_end = cache_offset
274 .checked_add(8)
275 .ok_or_else(|| Error::InvalidParameter("BAT cache end overflow".into()))?;
276 if cache_end > entry.bytes.len() {
277 return Err(Error::InvalidParameter(
278 "BAT entry index exceeds cached BAT region".into(),
279 ));
280 }
281 let mut updated = entry.bytes.to_vec();
282 updated[cache_offset..cache_end].copy_from_slice(&raw_entry);
283 *bat_cache = Some(CacheEntry::new(generation, Arc::from(updated)));
284 }
285
286 *self
287 .validator_buf
288 .write()
289 .map_err(|_| Error::InvalidFile("validator cache lock poisoned".into()))? = None;
290
291 Ok(())
292 }
293}
294
295impl<T> Medium<T>
296where
297 T: Read + Seek,
298{
299 pub fn sections(&self) -> Result<Sections<'_, T>> {
307 Sections::new(self.header_buf_arc()?, self)
308 }
309
310 pub(crate) fn is_strict(&self) -> bool {
312 self.strict
313 }
314
315 #[cfg(test)]
317 pub(crate) fn log_replay_policy(&self) -> LogReplayPolicy {
318 self.log_replay_policy
319 }
320
321 pub fn validator(&mut self) -> Result<crate::validation::SpecValidator> {
335 crate::validation::SpecValidator::from_file(self)
336 }
337
338 pub(crate) fn header_buf_arc(&self) -> Result<Arc<[u8]>> {
340 let generation = self.current_generation();
341 if let Some(entry) = self
342 .header_buf
343 .read()
344 .map_err(|_| Error::InvalidFile("header cache lock poisoned".into()))?
345 .as_ref()
346 && let Some(bytes) = entry.valid_bytes(generation)
347 {
348 return Ok(bytes);
349 }
350
351 let mut buf = vec![0u8; HEADER_BUFFER_SIZE];
352 {
353 let mut inner = self
354 .inner
355 .lock()
356 .map_err(|_| Error::InvalidFile("medium inner lock poisoned".into()))?;
357 read_exact_at(&mut *inner, 0, &mut buf)?;
358 }
359 let buf = Arc::<[u8]>::from(buf);
360 let mut cache = self
361 .header_buf
362 .write()
363 .map_err(|_| Error::InvalidFile("header cache lock poisoned".into()))?;
364 if let Some(entry) = cache.as_ref()
365 && let Some(bytes) = entry.valid_bytes(generation)
366 {
367 return Ok(bytes);
368 }
369 *cache = Some(CacheEntry::new(generation, Arc::clone(&buf)));
370 Ok(buf)
371 }
372
373 pub(crate) fn bat_buf(&self) -> Result<Arc<[u8]>> {
382 let generation = self.current_generation();
383 if let Some(entry) = self
384 .bat_buf
385 .read()
386 .map_err(|_| Error::InvalidFile("BAT cache lock poisoned".into()))?
387 .as_ref()
388 && let Some(bytes) = entry.valid_bytes(generation)
389 {
390 return Ok(bytes);
391 }
392
393 let data = self.read_region_with_overlay(BAT_REGION_GUID, Self::read_bat_region)?;
394 let data = Arc::<[u8]>::from(data);
395
396 let mut cache = self
397 .bat_buf
398 .write()
399 .map_err(|_| Error::InvalidFile("BAT cache lock poisoned".into()))?;
400 if let Some(entry) = cache.as_ref()
401 && let Some(bytes) = entry.valid_bytes(generation)
402 {
403 return Ok(bytes);
404 }
405 *cache = Some(CacheEntry::new(generation, Arc::clone(&data)));
406
407 Ok(data)
408 }
409
410 pub(crate) fn metadata_buf(&self) -> Result<Arc<[u8]>> {
419 let generation = self.current_generation();
420 if let Some(entry) = self
421 .metadata_buf
422 .read()
423 .map_err(|_| Error::InvalidFile("metadata cache lock poisoned".into()))?
424 .as_ref()
425 && let Some(bytes) = entry.valid_bytes(generation)
426 {
427 return Ok(bytes);
428 }
429
430 let data =
431 self.read_region_with_overlay(METADATA_REGION_GUID, Self::read_metadata_region)?;
432 let data = Arc::<[u8]>::from(data);
433 let mut cache = self
434 .metadata_buf
435 .write()
436 .map_err(|_| Error::InvalidFile("metadata cache lock poisoned".into()))?;
437 if let Some(entry) = cache.as_ref()
438 && let Some(bytes) = entry.valid_bytes(generation)
439 {
440 return Ok(bytes);
441 }
442 *cache = Some(CacheEntry::new(generation, Arc::clone(&data)));
443 Ok(data)
444 }
445
446 fn read_region_with_overlay(
447 &self, region_guid: Guid, read_region: fn(&Self) -> Result<Vec<u8>>,
448 ) -> Result<Vec<u8>> {
449 let mut data = read_region(self)?;
450 if self.replay_overlay.is_none() {
451 return Ok(data);
452 }
453
454 let header_buf = self.header_buf_arc()?;
455 let header = Header::new(&header_buf)?;
456 let rt = header.region_table(0)?;
457 if let Some(entry) = rt.entries().find(|entry| entry.guid() == region_guid) {
458 self.apply_replay_overlay(&mut data, entry.file_offset());
459 }
460 Ok(data)
461 }
462 pub(crate) fn log_buf(&self) -> Result<Arc<[u8]>> {
468 let generation = self.current_generation();
469 if let Some(entry) = self
470 .log_buf
471 .read()
472 .map_err(|_| Error::InvalidFile("log cache lock poisoned".into()))?
473 .as_ref()
474 && let Some(bytes) = entry.valid_bytes(generation)
475 {
476 return Ok(bytes);
477 }
478
479 let mut data = self.read_log_region()?;
480 if self.replay_overlay.is_some() {
481 let header_buf = self.header_buf_arc()?;
482 let header = Header::new(&header_buf)?;
483 let current = header.header(0)?;
484 self.apply_replay_overlay(&mut data, current.log_offset());
485 }
486 let data = Arc::<[u8]>::from(data);
487 let mut cache = self
488 .log_buf
489 .write()
490 .map_err(|_| Error::InvalidFile("log cache lock poisoned".into()))?;
491 if let Some(entry) = cache.as_ref()
492 && let Some(bytes) = entry.valid_bytes(generation)
493 {
494 return Ok(bytes);
495 }
496 *cache = Some(CacheEntry::new(generation, Arc::clone(&data)));
497 Ok(data)
498 }
499
500 pub(crate) fn validator_buf(&mut self) -> Result<Arc<[u8]>> {
505 let generation = self.current_generation();
506 if let Some(entry) = self
507 .validator_buf
508 .read()
509 .map_err(|_| Error::InvalidFile("validator cache lock poisoned".into()))?
510 .as_ref()
511 && let Some(bytes) = entry.valid_bytes(generation)
512 {
513 return Ok(bytes);
514 }
515
516 let data = Arc::<[u8]>::from(self.build_validator_buf()?);
517 let mut cache = self
518 .validator_buf
519 .write()
520 .map_err(|_| Error::InvalidFile("validator cache lock poisoned".into()))?;
521 if let Some(entry) = cache.as_ref()
522 && let Some(bytes) = entry.valid_bytes(generation)
523 {
524 return Ok(bytes);
525 }
526 *cache = Some(CacheEntry::new(generation, Arc::clone(&data)));
527 Ok(data)
528 }
529
530 fn build_validator_buf(&mut self) -> Result<Vec<u8>> {
542 let header_buf = self.header_buf_arc()?;
544 let Ok(header) = Header::new(&header_buf) else {
545 return Ok(header_buf.to_vec());
546 };
547 let Ok(current) = header.header(0) else {
548 return Ok(header_buf.to_vec());
549 };
550 let Ok(rt) = header.region_table(0) else {
551 return Ok(header_buf.to_vec());
552 };
553
554 let log_offset = usize::try_from(current.log_offset()).unwrap();
555 let log_length = usize::try_from(current.log_length()).unwrap();
556 let header_log_guid = current.log_guid();
557
558 let mut max_end = (MIB as usize).max(log_offset + log_length);
560 for entry in rt.entries() {
561 let end = usize::try_from(entry.file_offset()).unwrap()
562 + usize::try_from(entry.length()).unwrap();
563 max_end = max_end.max(end);
564 }
565
566 let mut buf = vec![0u8; max_end];
567
568 let header_len = header_buf.len().min(MIB as usize);
570 buf[..header_len].copy_from_slice(&header_buf[..header_len]);
571
572 let has_zero_log_guid = header_log_guid.to_bytes() == [0u8; 16];
577 if log_offset > 0
578 && log_length > 0
579 && !has_zero_log_guid
580 && let Ok(log_data) = self.log_buf()
581 {
582 let copy_len = log_data.len().min(log_length);
583 let end = log_offset + copy_len;
584 if end <= max_end {
585 buf[log_offset..end].copy_from_slice(&log_data[..copy_len]);
586 }
587 }
588
589 let regions: Vec<_> = rt
591 .entries()
592 .map(|entry| {
593 (
594 entry.guid(),
595 usize::try_from(entry.file_offset()).unwrap(),
596 usize::try_from(entry.length()).unwrap(),
597 )
598 })
599 .collect();
600
601 for (guid, offset, length) in regions {
602 let region_data: Vec<u8> = if guid == BAT_REGION_GUID {
603 self.bat_buf()
604 .map(|bytes| bytes.to_vec())
605 .unwrap_or_default()
606 } else if guid == METADATA_REGION_GUID {
607 self.metadata_buf()
608 .map(|bytes| bytes.to_vec())
609 .unwrap_or_default()
610 } else {
611 continue;
612 };
613
614 if !region_data.is_empty() {
615 let copy_len = region_data.len().min(length);
616 let end = offset + copy_len;
617 if end <= max_end {
618 buf[offset..end].copy_from_slice(®ion_data[..copy_len]);
619 }
620 }
621 }
622
623 Ok(buf)
624 }
625
626 fn read_bat_region(&self) -> Result<Vec<u8>> {
630 let header_buf = self.header_buf_arc()?;
631 let header = Header::new(&header_buf)?;
632 let rt = header.region_table(0)?;
633 for entry in rt.entries() {
634 if entry.guid() == BAT_REGION_GUID {
635 let offset = entry.file_offset();
636 let length = entry.length() as usize;
637 let mut buf = vec![0u8; length];
638 let mut inner = self
639 .inner
640 .lock()
641 .map_err(|_| Error::InvalidFile("medium inner lock poisoned".into()))?;
642 read_exact_at(&mut *inner, offset, &mut buf)?;
643 return Ok(buf);
644 }
645 }
646 Err(Error::InvalidFile(
647 "BAT region not found in region table".into(),
648 ))
649 }
650
651 fn read_metadata_region(&self) -> Result<Vec<u8>> {
653 let header_buf = self.header_buf_arc()?;
654 let header = Header::new(&header_buf)?;
655 let rt = header.region_table(0)?;
656 for entry in rt.entries() {
657 if entry.guid() == METADATA_REGION_GUID {
658 let offset = entry.file_offset();
659 let length = entry.length() as usize;
660 let mut buf = vec![0u8; length];
661 let mut inner = self
662 .inner
663 .lock()
664 .map_err(|_| Error::InvalidFile("medium inner lock poisoned".into()))?;
665 read_exact_at(&mut *inner, offset, &mut buf)?;
666 return Ok(buf);
667 }
668 }
669 Err(Error::InvalidFile(
670 "Metadata region not found in region table".into(),
671 ))
672 }
673
674 fn read_log_region(&self) -> Result<Vec<u8>> {
676 let header_buf = self.header_buf_arc()?;
677 let header = Header::new(&header_buf)?;
678 let h = header.header(0)?;
679 let offset = h.log_offset();
680 let length = h.log_length() as usize;
681 let mut buf = vec![0u8; length];
682 let mut inner = self
683 .inner
684 .lock()
685 .map_err(|_| Error::InvalidFile("medium inner lock poisoned".into()))?;
686 read_exact_at(&mut *inner, offset, &mut buf)?;
687 Ok(buf)
688 }
689
690 fn apply_replay_overlay(&self, region_data: &mut [u8], region_offset: u64) {
692 if let Some(ref overlay) = self.replay_overlay {
693 overlay.apply_to_region(region_data, region_offset);
694 }
695 }
696}