pijul_remote/
http.rs

1use anyhow::bail;
2use libpijul::pristine::{Base32, Position};
3use libpijul::Hash;
4use log::{debug, error, trace};
5use std::collections::HashSet;
6use std::io::Write;
7use std::path::PathBuf;
8
9use crate::CS;
10use pijul_interaction::ProgressBar;
11
12const USER_AGENT: &str = concat!("pijul-", env!("CARGO_PKG_VERSION"));
13
14pub struct Http {
15    pub url: url::Url,
16    pub channel: String,
17    pub client: reqwest::Client,
18    pub name: String,
19    pub headers: Vec<(String, String)>,
20}
21
22async fn download_change(
23    client: reqwest::Client,
24    url: url::Url,
25    headers: Vec<(String, String)>,
26    mut path: PathBuf,
27    c: CS,
28) -> Result<CS, anyhow::Error> {
29    let (req, c32) = match c {
30        CS::Change(c) => {
31            libpijul::changestore::filesystem::push_filename(&mut path, &c);
32            ("change", c.to_base32())
33        }
34        CS::State(c) => {
35            libpijul::changestore::filesystem::push_tag_filename(&mut path, &c);
36            if std::fs::metadata(&path).is_ok() {
37                bail!("Tag already downloaded: {}", c.to_base32())
38            }
39            ("tag", c.to_base32())
40        }
41    };
42    tokio::fs::create_dir_all(&path.parent().unwrap())
43        .await
44        .unwrap();
45    let path_ = path.with_extension("tmp");
46    let mut f = tokio::fs::File::create(&path_).await.unwrap();
47    let url = format!("{}/{}", url, super::DOT_DIR);
48    let mut delay = 1f64;
49
50    let (send, mut recv) = tokio::sync::mpsc::channel::<Option<bytes::Bytes>>(100);
51    let t = tokio::spawn(async move {
52        use tokio::io::AsyncWriteExt;
53        debug!("waiting chunk {:?}", c);
54        while let Some(chunk) = recv.recv().await {
55            match chunk {
56                Some(chunk) => {
57                    trace!("writing {:?}", chunk.len());
58                    f.write_all(&chunk).await?;
59                }
60                None => {
61                    f.set_len(0).await?;
62                }
63            }
64            debug!("waiting chunk {:?}", c);
65        }
66        debug!("done chunk {:?}", c);
67        f.flush().await?;
68        Ok::<_, std::io::Error>(())
69    });
70
71    let mut done = false;
72    while !done {
73        let mut req = client
74            .get(&url)
75            .query(&[(req, &c32)])
76            .header(reqwest::header::USER_AGENT, USER_AGENT);
77        for (k, v) in headers.iter() {
78            debug!("kv = {:?} {:?}", k, v);
79            req = req.header(k.as_str(), v.as_str());
80        }
81        let mut res = if let Ok(res) = req.send().await {
82            delay = 1f64;
83            res
84        } else {
85            debug!("HTTP error, retrying in {} seconds", delay.round());
86            tokio::time::sleep(std::time::Duration::from_secs_f64(delay)).await;
87            send.send(None).await?;
88            delay *= 2.;
89            continue;
90        };
91        debug!("response {:?}", res);
92        if !res.status().is_success() {
93            tokio::time::sleep(std::time::Duration::from_secs_f64(delay)).await;
94            send.send(None).await?;
95            bail!("Server returned {}", res.status().as_u16())
96        }
97        let mut size: Option<usize> = res
98            .headers()
99            .get(reqwest::header::CONTENT_LENGTH)
100            .and_then(|x| x.to_str().ok())
101            .and_then(|x| x.parse().ok());
102        while !done {
103            match res.chunk().await {
104                Ok(Some(chunk)) => {
105                    if let Some(ref mut s) = size {
106                        *s -= chunk.len();
107                    }
108                    send.send(Some(chunk)).await?;
109                }
110                Ok(None) => match size {
111                    Some(0) | None => done = true,
112                    _ => break,
113                },
114                Err(e) => {
115                    debug!("error {:?}", e);
116                    error!("Error while downloading {:?} from {:?}, retrying", c32, url);
117                    send.send(None).await?;
118                    tokio::time::sleep(std::time::Duration::from_secs_f64(delay)).await;
119                    delay *= 2.;
120                    break;
121                }
122            }
123        }
124    }
125    std::mem::drop(send);
126    t.await??;
127    debug!("renaming {:?} {:?} {:?} {:?}", c, path_, path, done);
128    if done {
129        match c {
130            CS::Change(_) => {
131                tokio::fs::rename(&path_, &path).await?;
132            }
133            CS::State(_) => {
134                tokio::fs::rename(&path_, &path).await?;
135            }
136        }
137    }
138    debug!("download_change returning {:?}", c);
139    Ok(c)
140}
141
142const POOL_SIZE: usize = 20;
143
144impl Http {
145    pub async fn download_changes(
146        &mut self,
147        progress_bar: ProgressBar,
148        hashes: &mut tokio::sync::mpsc::UnboundedReceiver<CS>,
149        send: &mut tokio::sync::mpsc::Sender<(CS, bool)>,
150        path: &PathBuf,
151        _full: bool,
152    ) -> Result<(), anyhow::Error> {
153        debug!("starting download_changes http");
154        let mut pool: [Option<tokio::task::JoinHandle<Result<CS, _>>>; POOL_SIZE] =
155            <[_; POOL_SIZE]>::default();
156        let mut cur = 0;
157        loop {
158            if let Some(t) = pool[cur].take() {
159                debug!("waiting for process {:?}", cur);
160                let c_ = t.await.unwrap().unwrap();
161                debug!("sending {:?}", c_);
162                progress_bar.inc(1);
163                if send.send((c_, true)).await.is_err() {
164                    debug!("err for {:?}", c_);
165                    break;
166                }
167                debug!("sent {:?}", c_);
168                continue;
169            }
170            let mut next = cur;
171            for i in 1..POOL_SIZE {
172                if pool[(cur + i) % POOL_SIZE].is_some() {
173                    next = (cur + i) % POOL_SIZE;
174                    break;
175                }
176            }
177            if next == cur {
178                if let Some(c) = hashes.recv().await {
179                    debug!("downloading on process {:?}: {:?}", cur, c);
180                    pool[cur] = Some(tokio::spawn(download_change(
181                        self.client.clone(),
182                        self.url.clone(),
183                        self.headers.clone(),
184                        path.clone(),
185                        c,
186                    )));
187                    cur = (cur + 1) % POOL_SIZE;
188                } else {
189                    break;
190                }
191            } else {
192                tokio::select! {
193                    c = hashes.recv() => {
194                        if let Some(c) = c {
195                            debug!("downloading on process {:?}: {:?}", cur, c);
196                            pool[cur] = Some(tokio::spawn(download_change(
197                                self.client.clone(),
198                                self.url.clone(),
199                                self.headers.clone(),
200                                path.clone(),
201                                c,
202                            )));
203                            cur = (cur + 1) % POOL_SIZE;
204                        } else {
205                            break;
206                        }
207                    }
208                    c = pool[next].as_mut().unwrap() => {
209                        pool[next] = None;
210                        let c = c??;
211                        progress_bar.inc(1);
212                        if send.send((c, true)).await.is_err() {
213                            debug!("err for {:?}", c);
214                            break;
215                        }
216                    }
217                }
218            }
219        }
220        Ok(())
221    }
222
223    pub async fn upload_changes(
224        &self,
225        progress_bar: ProgressBar,
226        mut local: PathBuf,
227        to_channel: Option<&str>,
228        changes: &[CS],
229    ) -> Result<(), anyhow::Error> {
230        for c in changes {
231            let url = {
232                let mut p = self.url.path().to_string();
233                if !p.ends_with("/") {
234                    p.push('/')
235                }
236                p.push_str(super::DOT_DIR);
237                let mut u = self.url.clone();
238                u.set_path(&p);
239                u
240            };
241            let mut to_channel = if let Some(ch) = to_channel {
242                vec![("to_channel", ch)]
243            } else {
244                Vec::new()
245            };
246            let base32;
247            let body = match c {
248                CS::Change(c) => {
249                    libpijul::changestore::filesystem::push_filename(&mut local, &c);
250                    let change = std::fs::read(&local)?;
251                    base32 = c.to_base32();
252                    to_channel.push(("apply", &base32));
253                    change
254                }
255                CS::State(c) => {
256                    libpijul::changestore::filesystem::push_tag_filename(&mut local, &c);
257                    let mut tag_file = libpijul::tag::OpenTagFile::open(&local, &c)?;
258                    let mut v = Vec::new();
259                    tag_file.short(&mut v)?;
260                    base32 = c.to_base32();
261                    to_channel.push(("tagup", &base32));
262                    v
263                }
264            };
265            libpijul::changestore::filesystem::pop_filename(&mut local);
266            debug!("url {:?} {:?}", url, to_channel);
267            let mut req = self
268                .client
269                .post(url)
270                .query(&to_channel)
271                .header(reqwest::header::USER_AGENT, USER_AGENT);
272            for (k, v) in self.headers.iter() {
273                debug!("kv = {:?} {:?}", k, v);
274                req = req.header(k.as_str(), v.as_str());
275            }
276            let resp = req.body(body).send().await?;
277            let stat = resp.status();
278            if !stat.is_success() {
279                let body = resp.text().await?;
280                if !body.is_empty() {
281                    bail!("The HTTP server returned an error: {}", body)
282                } else {
283                    if let Some(reason) = stat.canonical_reason() {
284                        bail!("HTTP Error {}: {}", stat.as_u16(), reason)
285                    } else {
286                        bail!("HTTP Error {}", stat.as_u16())
287                    }
288                }
289            }
290            progress_bar.inc(1);
291        }
292        Ok(())
293    }
294
295    pub async fn download_changelist<
296        A,
297        F: FnMut(&mut A, u64, Hash, libpijul::Merkle, bool) -> Result<(), anyhow::Error>,
298    >(
299        &self,
300        mut f: F,
301        a: &mut A,
302        from: u64,
303        paths: &[String],
304    ) -> Result<HashSet<Position<Hash>>, anyhow::Error> {
305        let url = {
306            let mut p = self.url.path().to_string();
307            if !p.ends_with("/") {
308                p.push('/')
309            }
310            p.push_str(super::DOT_DIR);
311            let mut u = self.url.clone();
312            u.set_path(&p);
313            u
314        };
315        let from_ = from.to_string();
316        let mut query = vec![("changelist", &from_), ("channel", &self.channel)];
317        for p in paths.iter() {
318            query.push(("path", p));
319        }
320        let mut req = self
321            .client
322            .get(url)
323            .query(&query)
324            .header(reqwest::header::USER_AGENT, USER_AGENT);
325        for (k, v) in self.headers.iter() {
326            debug!("kv = {:?} {:?}", k, v);
327            req = req.header(k.as_str(), v.as_str());
328        }
329        let res = req.send().await?;
330        let status = res.status();
331        if !status.is_success() {
332            match serde_json::from_slice::<libpijul::RemoteError>(&*res.bytes().await?) {
333                Ok(remote_err) => return Err(remote_err.into()),
334                Err(_) if status.as_u16() == 404 => {
335                    bail!("Repository `{}` not found (404)", self.url)
336                }
337                Err(_) => bail!("Http request failed with status code: {}", status),
338            }
339        }
340        let resp = res.bytes().await?;
341        let mut result = HashSet::new();
342        if let Ok(data) = std::str::from_utf8(&resp) {
343            for l in data.lines() {
344                debug!("l = {:?}", l);
345                if !l.is_empty() {
346                    match super::parse_line(l)? {
347                        super::ListLine::Change { n, m, h, tag } => f(a, n, h, m, tag)?,
348                        super::ListLine::Position(pos) => {
349                            result.insert(pos);
350                        }
351                        super::ListLine::Error(e) => {
352                            let mut stderr = std::io::stderr();
353                            writeln!(stderr, "{}", e)?;
354                        }
355                    }
356                } else {
357                    break;
358                }
359            }
360            debug!("done");
361        }
362        Ok(result)
363    }
364
365    pub async fn get_state(
366        &mut self,
367        mid: Option<u64>,
368    ) -> Result<Option<(u64, libpijul::Merkle, libpijul::Merkle)>, anyhow::Error> {
369        debug!("get_state {:?}", self.url);
370        let url = format!("{}/{}", self.url, super::DOT_DIR);
371        let q = if let Some(mid) = mid {
372            [
373                ("state", format!("{}", mid)),
374                ("channel", self.channel.clone()),
375            ]
376        } else {
377            [("state", String::new()), ("channel", self.channel.clone())]
378        };
379        let mut req = self
380            .client
381            .get(&url)
382            .query(&q)
383            .header(reqwest::header::USER_AGENT, USER_AGENT);
384        for (k, v) in self.headers.iter() {
385            debug!("kv = {:?} {:?}", k, v);
386            req = req.header(k.as_str(), v.as_str());
387        }
388        let res = req.send().await?;
389        if !res.status().is_success() {
390            bail!("HTTP error {:?}", res.status())
391        }
392        let resp = res.bytes().await?;
393        let resp = std::str::from_utf8(&resp)?;
394        debug!("resp = {:?}", resp);
395        let mut s = resp.split_whitespace();
396        if let (Some(n), Some(m), Some(m2)) = (
397            s.next().and_then(|s| s.parse().ok()),
398            s.next()
399                .and_then(|m| libpijul::Merkle::from_base32(m.as_bytes())),
400            s.next()
401                .and_then(|m| libpijul::Merkle::from_base32(m.as_bytes())),
402        ) {
403            Ok(Some((n, m, m2)))
404        } else {
405            Ok(None)
406        }
407    }
408
409    pub async fn get_id(&self) -> Result<Option<libpijul::pristine::RemoteId>, anyhow::Error> {
410        debug!("get_state {:?}", self.url);
411        let url = format!("{}/{}", self.url, super::DOT_DIR);
412        let q = [("channel", self.channel.clone()), ("id", String::new())];
413        let mut req = self
414            .client
415            .get(&url)
416            .query(&q)
417            .header(reqwest::header::USER_AGENT, USER_AGENT);
418        for (k, v) in self.headers.iter() {
419            debug!("kv = {:?} {:?}", k, v);
420            req = req.header(k.as_str(), v.as_str());
421        }
422        let res = req.send().await?;
423        if !res.status().is_success() {
424            bail!("HTTP error {:?}", res.status())
425        }
426        let resp = res.bytes().await?;
427        debug!("resp = {:?}", resp);
428        Ok(libpijul::pristine::RemoteId::from_bytes(&resp))
429    }
430
431    pub async fn archive<W: std::io::Write + Send + 'static>(
432        &mut self,
433        prefix: Option<String>,
434        state: Option<(libpijul::Merkle, &[Hash])>,
435        mut w: W,
436    ) -> Result<u64, anyhow::Error> {
437        let url = {
438            let mut p = self.url.path().to_string();
439            if !p.ends_with("/") {
440                p.push('/')
441            }
442            p.push_str(super::DOT_DIR);
443            let mut u = self.url.clone();
444            u.set_path(&p);
445            u
446        };
447        let res = self.client.get(url).query(&[("channel", &self.channel)]);
448        let res = if let Some((ref state, ref extra)) = state {
449            let mut q = vec![("archive".to_string(), state.to_base32())];
450            if let Some(pre) = prefix {
451                q.push(("outputPrefix".to_string(), pre));
452            }
453            for e in extra.iter() {
454                q.push(("change".to_string(), e.to_base32()))
455            }
456            res.query(&q)
457        } else {
458            res
459        };
460        let res = res
461            .header(reqwest::header::USER_AGENT, USER_AGENT)
462            .send()
463            .await?;
464        if !res.status().is_success() {
465            bail!("HTTP error {:?}", res.status())
466        }
467        use futures_util::StreamExt;
468        let mut stream = res.bytes_stream();
469        let mut conflicts = 0;
470        let mut n = 0;
471        while let Some(item) = stream.next().await {
472            let item = item?;
473            let mut off = 0;
474            while n < 8 && off < item.len() {
475                conflicts = (conflicts << 8) | (item[off] as u64);
476                off += 1;
477                n += 1
478            }
479            w.write_all(&item[off..])?;
480        }
481        Ok(conflicts as u64)
482    }
483
484    pub async fn update_identities(
485        &mut self,
486        rev: Option<u64>,
487        mut path: PathBuf,
488    ) -> Result<u64, anyhow::Error> {
489        let url = {
490            let mut p = self.url.path().to_string();
491            if !p.ends_with("/") {
492                p.push('/')
493            }
494            p.push_str(super::DOT_DIR);
495            let mut u = self.url.clone();
496            u.set_path(&p);
497            u
498        };
499        let mut req = self
500            .client
501            .get(url)
502            .query(&[(
503                "identities",
504                if let Some(rev) = rev {
505                    rev.to_string()
506                } else {
507                    0u32.to_string()
508                },
509            )])
510            .header(reqwest::header::USER_AGENT, USER_AGENT);
511        for (k, v) in self.headers.iter() {
512            debug!("kv = {:?} {:?}", k, v);
513            req = req.header(k.as_str(), v.as_str());
514        }
515        let res = req.send().await?;
516        if !res.status().is_success() {
517            bail!("HTTP error {:?}", res.status())
518        }
519        use serde_derive::*;
520        #[derive(Debug, Deserialize)]
521        struct Identities {
522            id: Vec<pijul_identity::Complete>,
523            rev: u64,
524        }
525        let resp: Option<Identities> = res.json().await?;
526
527        if let Some(resp) = resp {
528            std::fs::create_dir_all(&path)?;
529            for id in resp.id.iter() {
530                path.push(&id.public_key.key);
531                debug!("recv identity: {:?} {:?}", id, path);
532                let mut id_file = std::fs::File::create(&path)?;
533                serde_json::to_writer_pretty(&mut id_file, &id.as_portable())?;
534                path.pop();
535            }
536            Ok(resp.rev)
537        } else {
538            Ok(0)
539        }
540    }
541
542    pub async fn prove(&mut self, key: libpijul::key::SKey) -> Result<(), anyhow::Error> {
543        debug!("prove {:?}", self.url);
544        let url = format!("{}/{}", self.url, super::DOT_DIR);
545        let q = [("challenge", key.public_key().key)];
546        let mut req = self
547            .client
548            .get(&url)
549            .query(&q)
550            .header(reqwest::header::USER_AGENT, USER_AGENT);
551        for (k, v) in self.headers.iter() {
552            debug!("kv = {:?} {:?}", k, v);
553            req = req.header(k.as_str(), v.as_str());
554        }
555        let res = req.send().await?;
556        if !res.status().is_success() {
557            bail!("HTTP error {:?}", res.status())
558        }
559        let resp = res.bytes().await?;
560        debug!("resp = {:?}", resp);
561
562        let sig = key.sign_raw(&resp)?;
563        debug!("sig = {:?}", sig);
564        let q = [("prove", &sig)];
565        let mut req = self
566            .client
567            .get(&url)
568            .query(&q)
569            .header(reqwest::header::USER_AGENT, USER_AGENT);
570        for (k, v) in self.headers.iter() {
571            debug!("kv = {:?} {:?}", k, v);
572            req = req.header(k.as_str(), v.as_str());
573        }
574        let res = req.send().await?;
575        if !res.status().is_success() {
576            bail!("HTTP error {:?}", res.status())
577        }
578
579        Ok(())
580    }
581}