1use anyhow::bail;
2use libpijul::pristine::{Base32, Position};
3use libpijul::Hash;
4use log::{debug, error, trace};
5use std::collections::HashSet;
6use std::io::Write;
7use std::path::PathBuf;
8
9use crate::CS;
10use pijul_interaction::ProgressBar;
11
12const USER_AGENT: &str = concat!("pijul-", env!("CARGO_PKG_VERSION"));
13
14pub struct Http {
15 pub url: url::Url,
16 pub channel: String,
17 pub client: reqwest::Client,
18 pub name: String,
19 pub headers: Vec<(String, String)>,
20}
21
22async fn download_change(
23 client: reqwest::Client,
24 url: url::Url,
25 headers: Vec<(String, String)>,
26 mut path: PathBuf,
27 c: CS,
28) -> Result<CS, anyhow::Error> {
29 let (req, c32) = match c {
30 CS::Change(c) => {
31 libpijul::changestore::filesystem::push_filename(&mut path, &c);
32 ("change", c.to_base32())
33 }
34 CS::State(c) => {
35 libpijul::changestore::filesystem::push_tag_filename(&mut path, &c);
36 if std::fs::metadata(&path).is_ok() {
37 bail!("Tag already downloaded: {}", c.to_base32())
38 }
39 ("tag", c.to_base32())
40 }
41 };
42 tokio::fs::create_dir_all(&path.parent().unwrap())
43 .await
44 .unwrap();
45 let path_ = path.with_extension("tmp");
46 let mut f = tokio::fs::File::create(&path_).await.unwrap();
47 let url = format!("{}/{}", url, super::DOT_DIR);
48 let mut delay = 1f64;
49
50 let (send, mut recv) = tokio::sync::mpsc::channel::<Option<bytes::Bytes>>(100);
51 let t = tokio::spawn(async move {
52 use tokio::io::AsyncWriteExt;
53 debug!("waiting chunk {:?}", c);
54 while let Some(chunk) = recv.recv().await {
55 match chunk {
56 Some(chunk) => {
57 trace!("writing {:?}", chunk.len());
58 f.write_all(&chunk).await?;
59 }
60 None => {
61 f.set_len(0).await?;
62 }
63 }
64 debug!("waiting chunk {:?}", c);
65 }
66 debug!("done chunk {:?}", c);
67 f.flush().await?;
68 Ok::<_, std::io::Error>(())
69 });
70
71 let mut done = false;
72 while !done {
73 let mut req = client
74 .get(&url)
75 .query(&[(req, &c32)])
76 .header(reqwest::header::USER_AGENT, USER_AGENT);
77 for (k, v) in headers.iter() {
78 debug!("kv = {:?} {:?}", k, v);
79 req = req.header(k.as_str(), v.as_str());
80 }
81 let mut res = if let Ok(res) = req.send().await {
82 delay = 1f64;
83 res
84 } else {
85 debug!("HTTP error, retrying in {} seconds", delay.round());
86 tokio::time::sleep(std::time::Duration::from_secs_f64(delay)).await;
87 send.send(None).await?;
88 delay *= 2.;
89 continue;
90 };
91 debug!("response {:?}", res);
92 if !res.status().is_success() {
93 tokio::time::sleep(std::time::Duration::from_secs_f64(delay)).await;
94 send.send(None).await?;
95 bail!("Server returned {}", res.status().as_u16())
96 }
97 let mut size: Option<usize> = res
98 .headers()
99 .get(reqwest::header::CONTENT_LENGTH)
100 .and_then(|x| x.to_str().ok())
101 .and_then(|x| x.parse().ok());
102 while !done {
103 match res.chunk().await {
104 Ok(Some(chunk)) => {
105 if let Some(ref mut s) = size {
106 *s -= chunk.len();
107 }
108 send.send(Some(chunk)).await?;
109 }
110 Ok(None) => match size {
111 Some(0) | None => done = true,
112 _ => break,
113 },
114 Err(e) => {
115 debug!("error {:?}", e);
116 error!("Error while downloading {:?} from {:?}, retrying", c32, url);
117 send.send(None).await?;
118 tokio::time::sleep(std::time::Duration::from_secs_f64(delay)).await;
119 delay *= 2.;
120 break;
121 }
122 }
123 }
124 }
125 std::mem::drop(send);
126 t.await??;
127 debug!("renaming {:?} {:?} {:?} {:?}", c, path_, path, done);
128 if done {
129 match c {
130 CS::Change(_) => {
131 tokio::fs::rename(&path_, &path).await?;
132 }
133 CS::State(_) => {
134 tokio::fs::rename(&path_, &path).await?;
135 }
136 }
137 }
138 debug!("download_change returning {:?}", c);
139 Ok(c)
140}
141
142const POOL_SIZE: usize = 20;
143
144impl Http {
145 pub async fn download_changes(
146 &mut self,
147 progress_bar: ProgressBar,
148 hashes: &mut tokio::sync::mpsc::UnboundedReceiver<CS>,
149 send: &mut tokio::sync::mpsc::Sender<(CS, bool)>,
150 path: &PathBuf,
151 _full: bool,
152 ) -> Result<(), anyhow::Error> {
153 debug!("starting download_changes http");
154 let mut pool: [Option<tokio::task::JoinHandle<Result<CS, _>>>; POOL_SIZE] =
155 <[_; POOL_SIZE]>::default();
156 let mut cur = 0;
157 loop {
158 if let Some(t) = pool[cur].take() {
159 debug!("waiting for process {:?}", cur);
160 let c_ = t.await.unwrap().unwrap();
161 debug!("sending {:?}", c_);
162 progress_bar.inc(1);
163 if send.send((c_, true)).await.is_err() {
164 debug!("err for {:?}", c_);
165 break;
166 }
167 debug!("sent {:?}", c_);
168 continue;
169 }
170 let mut next = cur;
171 for i in 1..POOL_SIZE {
172 if pool[(cur + i) % POOL_SIZE].is_some() {
173 next = (cur + i) % POOL_SIZE;
174 break;
175 }
176 }
177 if next == cur {
178 if let Some(c) = hashes.recv().await {
179 debug!("downloading on process {:?}: {:?}", cur, c);
180 pool[cur] = Some(tokio::spawn(download_change(
181 self.client.clone(),
182 self.url.clone(),
183 self.headers.clone(),
184 path.clone(),
185 c,
186 )));
187 cur = (cur + 1) % POOL_SIZE;
188 } else {
189 break;
190 }
191 } else {
192 tokio::select! {
193 c = hashes.recv() => {
194 if let Some(c) = c {
195 debug!("downloading on process {:?}: {:?}", cur, c);
196 pool[cur] = Some(tokio::spawn(download_change(
197 self.client.clone(),
198 self.url.clone(),
199 self.headers.clone(),
200 path.clone(),
201 c,
202 )));
203 cur = (cur + 1) % POOL_SIZE;
204 } else {
205 break;
206 }
207 }
208 c = pool[next].as_mut().unwrap() => {
209 pool[next] = None;
210 let c = c??;
211 progress_bar.inc(1);
212 if send.send((c, true)).await.is_err() {
213 debug!("err for {:?}", c);
214 break;
215 }
216 }
217 }
218 }
219 }
220 Ok(())
221 }
222
223 pub async fn upload_changes(
224 &self,
225 progress_bar: ProgressBar,
226 mut local: PathBuf,
227 to_channel: Option<&str>,
228 changes: &[CS],
229 ) -> Result<(), anyhow::Error> {
230 for c in changes {
231 let url = {
232 let mut p = self.url.path().to_string();
233 if !p.ends_with("/") {
234 p.push('/')
235 }
236 p.push_str(super::DOT_DIR);
237 let mut u = self.url.clone();
238 u.set_path(&p);
239 u
240 };
241 let mut to_channel = if let Some(ch) = to_channel {
242 vec![("to_channel", ch)]
243 } else {
244 Vec::new()
245 };
246 let base32;
247 let body = match c {
248 CS::Change(c) => {
249 libpijul::changestore::filesystem::push_filename(&mut local, &c);
250 let change = std::fs::read(&local)?;
251 base32 = c.to_base32();
252 to_channel.push(("apply", &base32));
253 change
254 }
255 CS::State(c) => {
256 libpijul::changestore::filesystem::push_tag_filename(&mut local, &c);
257 let mut tag_file = libpijul::tag::OpenTagFile::open(&local, &c)?;
258 let mut v = Vec::new();
259 tag_file.short(&mut v)?;
260 base32 = c.to_base32();
261 to_channel.push(("tagup", &base32));
262 v
263 }
264 };
265 libpijul::changestore::filesystem::pop_filename(&mut local);
266 debug!("url {:?} {:?}", url, to_channel);
267 let mut req = self
268 .client
269 .post(url)
270 .query(&to_channel)
271 .header(reqwest::header::USER_AGENT, USER_AGENT);
272 for (k, v) in self.headers.iter() {
273 debug!("kv = {:?} {:?}", k, v);
274 req = req.header(k.as_str(), v.as_str());
275 }
276 let resp = req.body(body).send().await?;
277 let stat = resp.status();
278 if !stat.is_success() {
279 let body = resp.text().await?;
280 if !body.is_empty() {
281 bail!("The HTTP server returned an error: {}", body)
282 } else {
283 if let Some(reason) = stat.canonical_reason() {
284 bail!("HTTP Error {}: {}", stat.as_u16(), reason)
285 } else {
286 bail!("HTTP Error {}", stat.as_u16())
287 }
288 }
289 }
290 progress_bar.inc(1);
291 }
292 Ok(())
293 }
294
295 pub async fn download_changelist<
296 A,
297 F: FnMut(&mut A, u64, Hash, libpijul::Merkle, bool) -> Result<(), anyhow::Error>,
298 >(
299 &self,
300 mut f: F,
301 a: &mut A,
302 from: u64,
303 paths: &[String],
304 ) -> Result<HashSet<Position<Hash>>, anyhow::Error> {
305 let url = {
306 let mut p = self.url.path().to_string();
307 if !p.ends_with("/") {
308 p.push('/')
309 }
310 p.push_str(super::DOT_DIR);
311 let mut u = self.url.clone();
312 u.set_path(&p);
313 u
314 };
315 let from_ = from.to_string();
316 let mut query = vec![("changelist", &from_), ("channel", &self.channel)];
317 for p in paths.iter() {
318 query.push(("path", p));
319 }
320 let mut req = self
321 .client
322 .get(url)
323 .query(&query)
324 .header(reqwest::header::USER_AGENT, USER_AGENT);
325 for (k, v) in self.headers.iter() {
326 debug!("kv = {:?} {:?}", k, v);
327 req = req.header(k.as_str(), v.as_str());
328 }
329 let res = req.send().await?;
330 let status = res.status();
331 if !status.is_success() {
332 match serde_json::from_slice::<libpijul::RemoteError>(&*res.bytes().await?) {
333 Ok(remote_err) => return Err(remote_err.into()),
334 Err(_) if status.as_u16() == 404 => {
335 bail!("Repository `{}` not found (404)", self.url)
336 }
337 Err(_) => bail!("Http request failed with status code: {}", status),
338 }
339 }
340 let resp = res.bytes().await?;
341 let mut result = HashSet::new();
342 if let Ok(data) = std::str::from_utf8(&resp) {
343 for l in data.lines() {
344 debug!("l = {:?}", l);
345 if !l.is_empty() {
346 match super::parse_line(l)? {
347 super::ListLine::Change { n, m, h, tag } => f(a, n, h, m, tag)?,
348 super::ListLine::Position(pos) => {
349 result.insert(pos);
350 }
351 super::ListLine::Error(e) => {
352 let mut stderr = std::io::stderr();
353 writeln!(stderr, "{}", e)?;
354 }
355 }
356 } else {
357 break;
358 }
359 }
360 debug!("done");
361 }
362 Ok(result)
363 }
364
365 pub async fn get_state(
366 &mut self,
367 mid: Option<u64>,
368 ) -> Result<Option<(u64, libpijul::Merkle, libpijul::Merkle)>, anyhow::Error> {
369 debug!("get_state {:?}", self.url);
370 let url = format!("{}/{}", self.url, super::DOT_DIR);
371 let q = if let Some(mid) = mid {
372 [
373 ("state", format!("{}", mid)),
374 ("channel", self.channel.clone()),
375 ]
376 } else {
377 [("state", String::new()), ("channel", self.channel.clone())]
378 };
379 let mut req = self
380 .client
381 .get(&url)
382 .query(&q)
383 .header(reqwest::header::USER_AGENT, USER_AGENT);
384 for (k, v) in self.headers.iter() {
385 debug!("kv = {:?} {:?}", k, v);
386 req = req.header(k.as_str(), v.as_str());
387 }
388 let res = req.send().await?;
389 if !res.status().is_success() {
390 bail!("HTTP error {:?}", res.status())
391 }
392 let resp = res.bytes().await?;
393 let resp = std::str::from_utf8(&resp)?;
394 debug!("resp = {:?}", resp);
395 let mut s = resp.split_whitespace();
396 if let (Some(n), Some(m), Some(m2)) = (
397 s.next().and_then(|s| s.parse().ok()),
398 s.next()
399 .and_then(|m| libpijul::Merkle::from_base32(m.as_bytes())),
400 s.next()
401 .and_then(|m| libpijul::Merkle::from_base32(m.as_bytes())),
402 ) {
403 Ok(Some((n, m, m2)))
404 } else {
405 Ok(None)
406 }
407 }
408
409 pub async fn get_id(&self) -> Result<Option<libpijul::pristine::RemoteId>, anyhow::Error> {
410 debug!("get_state {:?}", self.url);
411 let url = format!("{}/{}", self.url, super::DOT_DIR);
412 let q = [("channel", self.channel.clone()), ("id", String::new())];
413 let mut req = self
414 .client
415 .get(&url)
416 .query(&q)
417 .header(reqwest::header::USER_AGENT, USER_AGENT);
418 for (k, v) in self.headers.iter() {
419 debug!("kv = {:?} {:?}", k, v);
420 req = req.header(k.as_str(), v.as_str());
421 }
422 let res = req.send().await?;
423 if !res.status().is_success() {
424 bail!("HTTP error {:?}", res.status())
425 }
426 let resp = res.bytes().await?;
427 debug!("resp = {:?}", resp);
428 Ok(libpijul::pristine::RemoteId::from_bytes(&resp))
429 }
430
431 pub async fn archive<W: std::io::Write + Send + 'static>(
432 &mut self,
433 prefix: Option<String>,
434 state: Option<(libpijul::Merkle, &[Hash])>,
435 mut w: W,
436 ) -> Result<u64, anyhow::Error> {
437 let url = {
438 let mut p = self.url.path().to_string();
439 if !p.ends_with("/") {
440 p.push('/')
441 }
442 p.push_str(super::DOT_DIR);
443 let mut u = self.url.clone();
444 u.set_path(&p);
445 u
446 };
447 let res = self.client.get(url).query(&[("channel", &self.channel)]);
448 let res = if let Some((ref state, ref extra)) = state {
449 let mut q = vec![("archive".to_string(), state.to_base32())];
450 if let Some(pre) = prefix {
451 q.push(("outputPrefix".to_string(), pre));
452 }
453 for e in extra.iter() {
454 q.push(("change".to_string(), e.to_base32()))
455 }
456 res.query(&q)
457 } else {
458 res
459 };
460 let res = res
461 .header(reqwest::header::USER_AGENT, USER_AGENT)
462 .send()
463 .await?;
464 if !res.status().is_success() {
465 bail!("HTTP error {:?}", res.status())
466 }
467 use futures_util::StreamExt;
468 let mut stream = res.bytes_stream();
469 let mut conflicts = 0;
470 let mut n = 0;
471 while let Some(item) = stream.next().await {
472 let item = item?;
473 let mut off = 0;
474 while n < 8 && off < item.len() {
475 conflicts = (conflicts << 8) | (item[off] as u64);
476 off += 1;
477 n += 1
478 }
479 w.write_all(&item[off..])?;
480 }
481 Ok(conflicts as u64)
482 }
483
484 pub async fn update_identities(
485 &mut self,
486 rev: Option<u64>,
487 mut path: PathBuf,
488 ) -> Result<u64, anyhow::Error> {
489 let url = {
490 let mut p = self.url.path().to_string();
491 if !p.ends_with("/") {
492 p.push('/')
493 }
494 p.push_str(super::DOT_DIR);
495 let mut u = self.url.clone();
496 u.set_path(&p);
497 u
498 };
499 let mut req = self
500 .client
501 .get(url)
502 .query(&[(
503 "identities",
504 if let Some(rev) = rev {
505 rev.to_string()
506 } else {
507 0u32.to_string()
508 },
509 )])
510 .header(reqwest::header::USER_AGENT, USER_AGENT);
511 for (k, v) in self.headers.iter() {
512 debug!("kv = {:?} {:?}", k, v);
513 req = req.header(k.as_str(), v.as_str());
514 }
515 let res = req.send().await?;
516 if !res.status().is_success() {
517 bail!("HTTP error {:?}", res.status())
518 }
519 use serde_derive::*;
520 #[derive(Debug, Deserialize)]
521 struct Identities {
522 id: Vec<pijul_identity::Complete>,
523 rev: u64,
524 }
525 let resp: Option<Identities> = res.json().await?;
526
527 if let Some(resp) = resp {
528 std::fs::create_dir_all(&path)?;
529 for id in resp.id.iter() {
530 path.push(&id.public_key.key);
531 debug!("recv identity: {:?} {:?}", id, path);
532 let mut id_file = std::fs::File::create(&path)?;
533 serde_json::to_writer_pretty(&mut id_file, &id.as_portable())?;
534 path.pop();
535 }
536 Ok(resp.rev)
537 } else {
538 Ok(0)
539 }
540 }
541
542 pub async fn prove(&mut self, key: libpijul::key::SKey) -> Result<(), anyhow::Error> {
543 debug!("prove {:?}", self.url);
544 let url = format!("{}/{}", self.url, super::DOT_DIR);
545 let q = [("challenge", key.public_key().key)];
546 let mut req = self
547 .client
548 .get(&url)
549 .query(&q)
550 .header(reqwest::header::USER_AGENT, USER_AGENT);
551 for (k, v) in self.headers.iter() {
552 debug!("kv = {:?} {:?}", k, v);
553 req = req.header(k.as_str(), v.as_str());
554 }
555 let res = req.send().await?;
556 if !res.status().is_success() {
557 bail!("HTTP error {:?}", res.status())
558 }
559 let resp = res.bytes().await?;
560 debug!("resp = {:?}", resp);
561
562 let sig = key.sign_raw(&resp)?;
563 debug!("sig = {:?}", sig);
564 let q = [("prove", &sig)];
565 let mut req = self
566 .client
567 .get(&url)
568 .query(&q)
569 .header(reqwest::header::USER_AGENT, USER_AGENT);
570 for (k, v) in self.headers.iter() {
571 debug!("kv = {:?} {:?}", k, v);
572 req = req.header(k.as_str(), v.as_str());
573 }
574 let res = req.send().await?;
575 if !res.status().is_success() {
576 bail!("HTTP error {:?}", res.status())
577 }
578
579 Ok(())
580 }
581}