read_url/tar/
url.rs

1use super::{
2    super::{context::*, url::*, util::*},
3    tar_url::*,
4};
5
6impl URL for TarUrl {
7    fn context(&self) -> &UrlContext {
8        &*self.context
9    }
10
11    fn query(&self) -> Option<UrlQuery> {
12        self.archive_url.query()
13    }
14
15    fn fragment(&self) -> Option<String> {
16        self.archive_url.fragment()
17    }
18
19    fn format(&self) -> Option<String> {
20        get_format_from_path(&self.path)
21    }
22
23    fn base(&self) -> Option<UrlRef> {
24        get_relative_path_parent(&self.path).map(|path| self.new_with(path).into())
25    }
26
27    fn relative(&self, path: &str) -> UrlRef {
28        self.new_with(self.path.join(path)).into()
29    }
30
31    #[cfg(feature = "blocking")]
32    fn conform(&mut self) -> Result<(), super::super::UrlError> {
33        // (We assume the archive URL has already been conformed)
34
35        // Note that tar entries could have relative or absolute paths
36        // (though absolute paths are rare), so we cannot conform to absolute
37        self.path = self.path.normalize();
38
39        self.open()?;
40
41        Ok(())
42    }
43
44    #[cfg(feature = "async")]
45    fn conform_async(&self) -> Result<ConformFuture, super::super::UrlError> {
46        use super::super::errors::*;
47
48        async fn conform_async(mut url: TarUrl) -> Result<UrlRef, UrlError> {
49            // (We assume the archive URL has already been conformed)
50
51            // Note that tar entries could have relative or absolute paths
52            // (though absolute paths are rare), so we cannot conform to absolute
53            url.path = url.path.normalize();
54
55            let _ = url.open_async()?;
56
57            Ok(url.into())
58        }
59
60        Ok(Box::pin(conform_async(self.clone())))
61    }
62
63    #[cfg(feature = "blocking")]
64    fn open(&self) -> Result<ReadRef, super::super::UrlError> {
65        use {
66            super::{super::errors::*, compression::*},
67            kutil::io::reader::*,
68            std::{io, str},
69            tar::*,
70        };
71
72        let mut reader = self.archive_url.open()?;
73
74        // Decompression
75        match self.get_compression() {
76            TarCompression::None => {}
77
78            #[cfg(feature = "gzip")]
79            TarCompression::Gzip => {
80                use {flate2::read::*, tracing::info};
81                info!("gzip decompression (blocking)");
82                reader = Box::new(GzDecoder::new(io::BufReader::new(reader)));
83            }
84
85            #[cfg(feature = "zstandard")]
86            TarCompression::Zstandard => {
87                use {tracing::info, zstd::stream::*};
88                info!("zstd decompression (blocking)");
89                reader = Box::new(Decoder::new(io::BufReader::new(reader))?);
90            }
91
92            #[cfg(not(all(feature = "gzip", feature = "zstandard")))]
93            compression => return Err(UrlError::UnsupportedFormat(compression.to_string())),
94        }
95
96        let mut archive = Archive::new(reader);
97
98        // Advance the reader to the beginning of the tar entry
99        let mut size = None;
100        for entry in archive.entries()? {
101            let entry = entry?;
102            match str::from_utf8(&entry.path_bytes()) {
103                Ok(path) => {
104                    if path == self.path {
105                        size = Some(entry.size() as usize);
106                        break;
107                    }
108                }
109
110                Err(_) => {}
111            }
112        }
113
114        // It might seem like an unreliable trick to assume that we are at the right
115        // place with the reader; after all this is undocumented, internal behavior of
116        // the "tar" crate.
117
118        // However, it is 1) correct!, 2) a reasonable expectation considering the
119        // crate's *external* design surface, and 3) our only real choice (aside from
120        // going with "unsafe") because we cannot otherwise disentangle the references
121        // of Entry to Entries to Archive and return a movable io::Read to an Entry.
122
123        match size {
124            Some(size) => {
125                // Get our reader back, now at the right place
126                let reader = archive.into_inner();
127
128                // BoundedReader will make sure we don't read beyond our entry
129                Ok(Box::new(BoundedReader::new(reader, size)))
130            }
131
132            None => Err(UrlError::new_io_not_found(self)),
133        }
134    }
135
136    #[cfg(feature = "async")]
137    fn open_async(&self) -> Result<OpenFuture, super::super::UrlError> {
138        use {
139            super::{super::errors::*, compression::*},
140            futures::*,
141            std::str,
142            tokio_tar::*,
143        };
144
145        // Note that we are using a fork of async-tar that uses Tokio instead of Futures
146        // Let's hope it stays maintained! Otherwise, we could also use tokio-util Compat
147        // with async-tar.
148
149        async fn open_async(url: TarUrl) -> Result<AsyncReadRef, UrlError> {
150            let mut reader = url.archive_url.open_async()?.await?;
151
152            // Decompression
153            match url.get_compression() {
154                TarCompression::None => {}
155
156                #[cfg(feature = "gzip")]
157                TarCompression::Gzip => {
158                    use {async_compression::tokio::bufread::*, tokio::io::*, tracing::info};
159                    info!("gzip decompression (asynchronous)");
160                    reader = Box::pin(GzipDecoder::new(BufReader::new(reader)));
161                }
162
163                #[cfg(feature = "zstandard")]
164                TarCompression::Zstandard => {
165                    use {async_compression::tokio::bufread::*, tokio::io::*, tracing::info};
166                    info!("zstd decompression (asynchronous)");
167                    reader = Box::pin(ZstdDecoder::new(BufReader::new(reader)));
168                }
169
170                #[cfg(not(all(feature = "gzip", feature = "zstandard")))]
171                compression => return Err(UrlError::UnsupportedFormat(compression.to_string())),
172            }
173
174            let mut archive = Archive::new(reader);
175
176            let mut entries = archive.entries()?;
177            while let Some(entry) = entries.next().await {
178                let entry = entry?;
179                match str::from_utf8(&entry.path_bytes()) {
180                    Ok(path) => {
181                        if path == url.path {
182                            return Ok(Box::pin(entry));
183                        }
184                    }
185
186                    Err(_) => {}
187                }
188            }
189
190            return Err(UrlError::new_io_not_found(url));
191        }
192
193        Ok(Box::pin(open_async(self.clone())))
194    }
195}