1use std::mem::MaybeUninit;
2
3use crate::{ Error, Result };
4use crate::util::AsInit;
5
6use super::{ Cache, CacheEntry, CacheEntryData };
7
8#[derive(Debug)]
9pub struct Uri {
10 uri: url::Url,
11 size: Option<u64>,
12 cache: Option<CacheEntry>,
13 pos: u64,
14 is_eof: bool,
15}
16
17bitflags::bitflags! {
18 #[derive(Clone, Copy)]
19 struct Flags: u8 {
20 const NO_CACHE = 1;
21 const NO_COMPRESS = 2;
22 }
23}
24
25impl Uri {
26 pub fn new(uri: &url::Url) -> Self {
27 Self {
28 uri: uri.clone(),
29 size: None,
30 cache: None,
31 pos: 0,
32 is_eof: false,
33 }
34 }
35
36 async fn open_cached(&self, entry: &mut CacheEntryData, flags: Flags) -> Result<()>
37 {
38 use reqwest::header as H;
39 use reqwest::StatusCode as S;
40
41 if entry.is_running() {
42 return Ok(());
43 }
44
45 let client = Cache::get_client();
46
47 let mut req = entry
48 .fill_request(client.get(entry.key.clone()));
49
50 if flags.contains(Flags::NO_COMPRESS) {
51 req = req.header(H::ACCEPT_ENCODING, "identity");
52 }
53
54 let req = req.build()?;
55
56 entry.update_localtm();
57
58 let resp = client.execute(req).await?;
59
60 match resp.status() {
61 S::NOT_MODIFIED => {
62 entry.set_response(resp);
63 entry.fill_meta().await?;
64 },
65 S::OK => {
66 entry.invalidate();
67 entry.set_response(resp);
68 entry.fill_meta().await?;
69 },
70 s => {
71 return Err(Error::HttpStatus(s));
72 }
73 }
74
75 Ok(())
76 }
77
78 fn get_uri(&self) -> (std::borrow::Cow<'_, url::Url>, Flags) {
79 use std::borrow::Cow;
80
81 let mut res = Cow::Borrowed(&self.uri);
82 let mut flags = Flags::empty();
83
84 let (scheme, has_xtra) = {
85 let mut i = self.uri.scheme().split('+');
86
87 let scheme = i.next().unwrap();
88 let mut has_xtra = false;
89
90 for x in i {
91 has_xtra = true;
92
93 match x {
94 "nocache" => flags |= Flags::NO_CACHE,
95 "nocompress" => flags |= Flags::NO_COMPRESS,
96 s => warn!("unsupported scheme modifier {}", s),
97 }
98 }
99
100 (scheme, has_xtra)
101 };
102
103 if has_xtra {
104 match res.to_mut().set_scheme(scheme) {
105 Ok(_) => { },
106 Err(_) => {
107 let uri = scheme.to_string() + &res.as_str()[self.uri.scheme().len()..];
110 let uri = url::Url::parse(&uri).unwrap();
111
112 res = Cow::Owned(uri);
113 }
114 };
115 }
116
117 (res, flags)
118 }
119
120 pub async fn open(&mut self) -> Result<()>
121 {
122 let (uri, flags) = self.get_uri();
123 let entry = match flags {
124 f if f.contains(Flags::NO_CACHE) => Cache::create(&uri),
125 _ => Cache::lookup_or_create(&uri),
126 };
127
128 {
129 let mut e_locked = entry.write().await;
130
131 self.open_cached(&mut e_locked, flags).await?;
132 self.size = Some(e_locked.get_filesize().await?);
133
134 if e_locked.is_error() {
135 Cache::remove(&e_locked.key);
136 }
137 };
138
139 {
140 let e_locked = entry.read().await;
141
142 Cache::replace(&e_locked.key, &entry)
143 };
144
145 self.cache = Some(entry);
146
147 Ok(())
148 }
149
150 pub async fn get_size(&self) -> Option<u64>
151 {
152 self.size
153 }
154
155 pub async fn read<'a>(&mut self, buf: &'a mut [MaybeUninit<u8>]) -> Result<&'a [u8]>
157 {
158 assert!(!self.is_eof);
159
160 let mut entry = self.cache.as_ref().unwrap().write().await;
161
162 let len = buf.len();
163 let mut pos = 0;
164
165 while pos < len {
166 let sz = entry.read_some(self.pos, &mut buf[pos..len]).await?.len();
167
168 if sz == 0 {
169 self.is_eof = true;
170 break;
171 }
172
173 pos += sz;
174 self.pos += sz as u64;
175 }
176
177 Ok(unsafe { buf[..pos].assume_init() })
178 }
179
180 pub fn is_eof(&self) -> bool
181 {
182 self.is_eof
183 }
184}