r_tftpd_proxy/
uri.rs

1use std::mem::MaybeUninit;
2
3use crate::{ Error, Result };
4use crate::util::AsInit;
5
6use super::{ Cache, CacheEntry, CacheEntryData };
7
8#[derive(Debug)]
9pub struct Uri {
10    uri:	url::Url,
11    size:	Option<u64>,
12    cache:	Option<CacheEntry>,
13    pos:	u64,
14    is_eof:	bool,
15}
16
17bitflags::bitflags! {
18    #[derive(Clone, Copy)]
19    struct Flags: u8 {
20	const NO_CACHE = 1;
21	const NO_COMPRESS = 2;
22    }
23}
24
25impl Uri {
26    pub fn new(uri: &url::Url) -> Self {
27	Self {
28	    uri:	uri.clone(),
29	    size:	None,
30	    cache:	None,
31	    pos:	0,
32	    is_eof:	false,
33	}
34    }
35
36    async fn open_cached(&self, entry: &mut CacheEntryData, flags: Flags) -> Result<()>
37    {
38	use reqwest::header as H;
39	use reqwest::StatusCode as S;
40
41	if entry.is_running() {
42	    return Ok(());
43	}
44
45	let client = Cache::get_client();
46
47	let mut req = entry
48	    .fill_request(client.get(entry.key.clone()));
49
50	if flags.contains(Flags::NO_COMPRESS) {
51	    req = req.header(H::ACCEPT_ENCODING, "identity");
52	}
53
54	let req = req.build()?;
55
56	entry.update_localtm();
57
58	let resp = client.execute(req).await?;
59
60	match resp.status() {
61	    S::NOT_MODIFIED	=> {
62		entry.set_response(resp);
63		entry.fill_meta().await?;
64	    },
65	    S::OK		=> {
66		entry.invalidate();
67		entry.set_response(resp);
68		entry.fill_meta().await?;
69	    },
70	    s			=> {
71		return Err(Error::HttpStatus(s));
72	    }
73	}
74
75	Ok(())
76    }
77
78    fn get_uri(&self) -> (std::borrow::Cow<'_, url::Url>, Flags) {
79	use std::borrow::Cow;
80
81	let mut res = Cow::Borrowed(&self.uri);
82	let mut flags = Flags::empty();
83
84	let (scheme, has_xtra) = {
85	    let mut i = self.uri.scheme().split('+');
86
87	    let scheme = i.next().unwrap();
88	    let mut has_xtra = false;
89
90	    for x in i {
91		has_xtra = true;
92
93		match x {
94		    "nocache"		=> flags |= Flags::NO_CACHE,
95		    "nocompress"	=> flags |= Flags::NO_COMPRESS,
96		    s			=> warn!("unsupported scheme modifier {}", s),
97		}
98	    }
99
100	    (scheme, has_xtra)
101	};
102
103	if has_xtra {
104	    match res.to_mut().set_scheme(scheme) {
105		Ok(_)	=> { },
106		Err(_)	=> {
107		    // 'url' crate does not allow rewriting non-standard
108		    // 'http+nocached' to standard 'http' scheme
109		    let uri = scheme.to_string() + &res.as_str()[self.uri.scheme().len()..];
110		    let uri = url::Url::parse(&uri).unwrap();
111
112		    res = Cow::Owned(uri);
113		}
114	    };
115	}
116
117	(res, flags)
118    }
119
120    pub async fn open(&mut self) -> Result<()>
121    {
122	let (uri, flags) = self.get_uri();
123	let entry = match flags {
124	    f if f.contains(Flags::NO_CACHE)	=> Cache::create(&uri),
125	    _					=> Cache::lookup_or_create(&uri),
126	};
127
128	{
129	    let mut e_locked = entry.write().await;
130
131	    self.open_cached(&mut e_locked, flags).await?;
132	    self.size = Some(e_locked.get_filesize().await?);
133
134	    if e_locked.is_error() {
135		Cache::remove(&e_locked.key);
136	    }
137	};
138
139	{
140	    let e_locked = entry.read().await;
141
142	    Cache::replace(&e_locked.key, &entry)
143	};
144
145	self.cache = Some(entry);
146
147	Ok(())
148    }
149
150    pub async fn get_size(&self) -> Option<u64>
151    {
152	self.size
153    }
154
155    //#[instrument(level = "trace", skip_all, ret)]
156    pub async fn read<'a>(&mut self, buf: &'a mut [MaybeUninit<u8>]) -> Result<&'a [u8]>
157    {
158	assert!(!self.is_eof);
159
160	let mut entry = self.cache.as_ref().unwrap().write().await;
161
162	let len = buf.len();
163	let mut pos = 0;
164
165	while pos < len {
166	    let sz = entry.read_some(self.pos, &mut buf[pos..len]).await?.len();
167
168	    if sz == 0 {
169		self.is_eof = true;
170		break;
171	    }
172
173	    pos += sz;
174	    self.pos += sz as u64;
175	}
176
177	Ok(unsafe { buf[..pos].assume_init() })
178    }
179
180    pub fn is_eof(&self) -> bool
181    {
182	self.is_eof
183    }
184}