zstd_framed/table/
futures.rs

1#![cfg(feature = "futures")]
2
3use super::{ZstdFrame, ZstdFrameSize, ZstdSeekTable};
4
5use futures::io::{AsyncReadExt as _, AsyncSeekExt as _};
6
7/// Read the seek table from the end of a [zstd seekable format] stream.
8///
9/// Returns `Ok(None)` if the stream doesn't apper to contain a seek table.
10/// Otherwise, returns `Err(_)` if the seek table could not be parsed or
11/// if an I/O error occurred while trying to read the seek table. If it
12/// returns `Ok(_)`, it will also restore the reader to its original
13/// stream position.
14///
15/// The seek table is returned as-is from the underlying reader. No attempt
16/// is made to validate that the seek table lines up with the underlying
17/// zstd stream. This means a malformed seek table could have out-of-bounds
18/// offsets, could omit sections of the underyling stream, or could be
19/// misaligned from frames of the underlying stream.
20///
21/// Other implementations:
22///
23/// - sync I/O: [`crate::table::read_seek_table`]
24/// - `tokio`: [`crate::table::tokio::read_seek_table`]
25///
26/// [zstd seekable format]: https://github.com/facebook/zstd/tree/51eb7daf39c8e8a7c338ba214a9d4e2a6a086826/contrib/seekable_format
27pub async fn read_seek_table<R>(mut reader: R) -> std::io::Result<Option<ZstdSeekTable>>
28where
29    R: Unpin + futures::AsyncRead + futures::AsyncSeek,
30{
31    // Get the stream position, so we can restore it later
32    let initial_position = reader.stream_position().await?;
33
34    // Read the seek table
35    let seek_table_result = read_seek_table_inner(&mut reader).await;
36
37    // Try to restore the seek position, even if reading
38    // the seek table failed
39    let seek_result = reader
40        .seek(std::io::SeekFrom::Start(initial_position))
41        .await;
42
43    // If we got an error, return whichever we got first
44    let seek_table = seek_table_result?;
45    seek_result?;
46
47    Ok(seek_table)
48}
49
50async fn read_seek_table_inner<R>(mut reader: R) -> std::io::Result<Option<ZstdSeekTable>>
51where
52    R: Unpin + futures::AsyncRead + futures::AsyncSeek,
53{
54    // Seek to the start of the zstd seek table footer
55    reader.seek(std::io::SeekFrom::End(-9)).await?;
56
57    // Read the footer fields: number of frames (4 bytes),
58    // table descriptor (1 byte), and the magic number (4 bytes)
59    let mut num_frames_bytes = [0; 4];
60    reader.read_exact(&mut num_frames_bytes).await?;
61
62    let mut seek_table_descriptor_bytes = [0; 1];
63    reader.read_exact(&mut seek_table_descriptor_bytes).await?;
64
65    let mut seekable_magic_number_bytes = [0; 4];
66    reader.read_exact(&mut seekable_magic_number_bytes).await?;
67
68    // Return if the magic number doesn't match
69    if seekable_magic_number_bytes != crate::SEEKABLE_FOOTER_MAGIC_BYTES {
70        return Ok(None);
71    }
72
73    // Parse the number of frames
74    let num_frames = u32::from_le_bytes(num_frames_bytes);
75
76    // Validate the seek table descriptor
77    let [seek_table_descriptor] = seek_table_descriptor_bytes;
78    let has_checksum = seek_table_descriptor & 0b1000_0000 != 0;
79    let is_reserved_valid = seek_table_descriptor & 0b0111_1100 == 0;
80
81    if !is_reserved_valid {
82        return Err(std::io::Error::other(
83            "zstd seek table has unsupported descriptor",
84        ));
85    }
86
87    // Determine the table entry size (8 bytes, or 12 bytes with checksums)
88    let table_entry_size: u32 = if has_checksum { 12 } else { 8 };
89
90    // Calculate the full size of the skippable frame containing the
91    // seek table. This can't overflow for a valid seek table, since the
92    // frame size is part of the frame header.
93    let table_frame_size = table_entry_size
94        .checked_mul(num_frames)
95        .and_then(|size| size.checked_add(9))
96        .ok_or_else(|| std::io::Error::other("zstd seek table size overflowed"))?;
97
98    // Seek to the start of the skippable frame containing the seek table
99    reader
100        .seek(std::io::SeekFrom::Current(-i64::from(table_frame_size) - 8))
101        .await?;
102
103    // Read the skippable frame magic number header: the
104    // magic number (4 bytes) and the frame size (4 bytes)
105    let mut skippable_magic_number_bytes = [0; 4];
106    reader.read_exact(&mut skippable_magic_number_bytes).await?;
107
108    let mut actual_table_frame_size_bytes = [0; 4];
109    reader
110        .read_exact(&mut actual_table_frame_size_bytes)
111        .await?;
112
113    // Validate the skippable frame magic number and frame size
114    if skippable_magic_number_bytes != crate::SKIPPABLE_HEADER_MAGIC_BYTES {
115        return Err(std::io::Error::other(
116            "zstd seek table has unsupported skippable frame magic number",
117        ));
118    }
119
120    let actual_table_frame_size = u32::from_le_bytes(actual_table_frame_size_bytes);
121    if actual_table_frame_size != table_frame_size {
122        return Err(std::io::Error::other("zstd seek table size did not match"));
123    }
124
125    // Read each table entry
126    let mut table = ZstdSeekTable::empty();
127    let mut compressed_pos = 0;
128    let mut decompressed_pos = 0;
129    for frame_index in 0..num_frames {
130        let frame_index = usize::try_from(frame_index).unwrap();
131
132        // Read the compressed size
133        let mut compressed_size_bytes = [0; 4];
134        reader.read_exact(&mut compressed_size_bytes).await?;
135        let compressed_size = u32::from_le_bytes(compressed_size_bytes);
136
137        // Read the decompressed size
138        let mut decompressed_size_bytes = [0; 4];
139        reader.read_exact(&mut decompressed_size_bytes).await?;
140        let decompressed_size = u32::from_le_bytes(decompressed_size_bytes);
141
142        // Skip the checksum if present
143        if has_checksum {
144            reader.seek(std::io::SeekFrom::Current(4)).await?;
145        }
146
147        let frame = ZstdFrame {
148            compressed_pos,
149            decompressed_pos,
150            index: frame_index,
151            size: ZstdFrameSize {
152                compressed_size: compressed_size.into(),
153                decompressed_size: decompressed_size.into(),
154            },
155        };
156        table.insert(frame);
157
158        compressed_pos += u64::from(compressed_size);
159        decompressed_pos += u64::from(decompressed_size);
160    }
161
162    Ok(Some(table))
163}