pijul_remote/
lib.rs

1use std::collections::HashSet;
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use anyhow::{bail, Context};
7use async_trait::async_trait;
8use lazy_static::lazy_static;
9use libpijul::pristine::{
10    sanakirja::MutTxn, Base32, ChangeId, ChannelRef, GraphIter, Hash, Merkle, MutTxnT, RemoteRef,
11    TxnT,
12};
13use libpijul::DOT_DIR;
14use libpijul::{ChannelTxnT, DepsTxnT, GraphTxnT, MutTxnTExt, TxnTExt};
15use log::{debug, info};
16
17use pijul_config::*;
18use pijul_identity::Complete;
19use pijul_repository::*;
20
21pub mod ssh;
22use ssh::*;
23
24pub mod local;
25use local::*;
26
27pub mod http;
28use http::*;
29
30use pijul_interaction::{
31    ProgressBar, Spinner, APPLY_MESSAGE, COMPLETE_MESSAGE, DOWNLOAD_MESSAGE, UPLOAD_MESSAGE,
32};
33
34pub const PROTOCOL_VERSION: usize = 3;
35
36pub enum RemoteRepo {
37    Local(Local),
38    Ssh(Ssh),
39    Http(Http),
40    LocalChannel(String),
41    None,
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
45pub enum CS {
46    Change(Hash),
47    State(Merkle),
48}
49
50pub async fn repository(
51    repo: &Repository,
52    self_path: Option<&Path>,
53    // User name in case it isn't provided in the `name` argument already.
54    user: Option<&str>,
55    name: &str,
56    channel: &str,
57    no_cert_check: bool,
58    with_path: bool,
59) -> Result<RemoteRepo, anyhow::Error> {
60    if let Some(name) = repo.config.remotes.iter().find(|e| e.name() == name) {
61        name.to_remote(channel, no_cert_check, with_path).await
62    } else {
63        unknown_remote(self_path, user, name, channel, no_cert_check, with_path).await
64    }
65}
66
67/// Associate a generated key with a remote identity. Patches authored
68/// by unproven keys will only display the key as the author.
69pub async fn prove(
70    identity: &Complete,
71    origin: Option<&str>,
72    no_cert_check: bool,
73) -> Result<(), anyhow::Error> {
74    let remote = origin.unwrap_or(&identity.config.author.origin);
75    let mut stderr = std::io::stderr();
76    writeln!(
77        stderr,
78        "Linking identity `{}` with {}@{}",
79        &identity.name, &identity.config.author.username, remote
80    )?;
81
82    let mut remote = if let Ok(repo) = Repository::find_root(None) {
83        repository(
84            &repo,
85            None,
86            Some(&identity.config.author.username),
87            &remote,
88            libpijul::DEFAULT_CHANNEL,
89            no_cert_check,
90            false,
91        )
92        .await?
93    } else {
94        unknown_remote(
95            None,
96            Some(&identity.config.author.username),
97            &remote,
98            libpijul::DEFAULT_CHANNEL,
99            no_cert_check,
100            false,
101        )
102        .await?
103    };
104
105    let (key, _password) = identity
106        .credentials
107        .clone()
108        .unwrap()
109        .decrypt(&identity.name)?;
110    remote.prove(key).await?;
111
112    Ok(())
113}
114
115#[async_trait]
116pub trait ToRemote {
117    async fn to_remote(
118        &self,
119        channel: &str,
120        no_cert_check: bool,
121        with_path: bool,
122    ) -> Result<RemoteRepo, anyhow::Error>;
123}
124
125#[async_trait]
126impl ToRemote for RemoteConfig {
127    async fn to_remote(
128        &self,
129        channel: &str,
130        no_cert_check: bool,
131        with_path: bool,
132    ) -> Result<RemoteRepo, anyhow::Error> {
133        match self {
134            RemoteConfig::Ssh { ssh, .. } => {
135                if let Some(mut sshr) = ssh_remote(None, ssh, with_path) {
136                    debug!("unknown_remote, ssh = {:?}", ssh);
137                    if let Some(c) = sshr.connect(ssh, channel).await? {
138                        return Ok(RemoteRepo::Ssh(c));
139                    }
140                }
141                bail!("Remote not found: {:?}", ssh)
142            }
143            RemoteConfig::Http {
144                http,
145                headers,
146                name,
147            } => {
148                let mut h = Vec::new();
149                for (k, v) in headers.iter() {
150                    match v {
151                        RemoteHttpHeader::String(s) => {
152                            h.push((k.clone(), s.clone()));
153                        }
154                        RemoteHttpHeader::Shell(shell) => {
155                            h.push((k.clone(), shell_cmd(&shell.shell)?));
156                        }
157                    }
158                }
159                return Ok(RemoteRepo::Http(Http {
160                    url: http.parse().unwrap(),
161                    channel: channel.to_string(),
162                    client: reqwest::ClientBuilder::new()
163                        .danger_accept_invalid_certs(no_cert_check)
164                        .build()?,
165                    headers: h,
166                    name: name.to_string(),
167                }));
168            }
169        }
170    }
171}
172
173pub async fn unknown_remote(
174    self_path: Option<&Path>,
175    user: Option<&str>,
176    name: &str,
177    channel: &str,
178    no_cert_check: bool,
179    with_path: bool,
180) -> Result<RemoteRepo, anyhow::Error> {
181    if let Ok(url) = url::Url::parse(name) {
182        let scheme = url.scheme();
183        if scheme == "http" || scheme == "https" {
184            debug!("unknown_remote, http = {:?}", name);
185            return Ok(RemoteRepo::Http(Http {
186                url,
187                channel: channel.to_string(),
188                client: reqwest::ClientBuilder::new()
189                    .danger_accept_invalid_certs(no_cert_check)
190                    .build()?,
191                headers: Vec::new(),
192                name: name.to_string(),
193            }));
194        } else if scheme == "ssh" {
195            if let Some(mut ssh) = ssh_remote(user, name, with_path) {
196                debug!("unknown_remote, ssh = {:?}", ssh);
197                if let Some(c) = ssh.connect(name, channel).await? {
198                    return Ok(RemoteRepo::Ssh(c));
199                }
200            }
201            bail!("Remote not found: {:?}", name)
202        } else {
203            bail!("Remote scheme not supported: {:?}", scheme)
204        }
205    }
206    if let Ok(root) = std::fs::canonicalize(name) {
207        if let Some(path) = self_path {
208            let path = std::fs::canonicalize(path)?;
209            if path == root {
210                return Ok(RemoteRepo::LocalChannel(channel.to_string()));
211            }
212        }
213
214        let mut dot_dir = root.join(DOT_DIR);
215        let changes_dir = dot_dir.join(CHANGES_DIR);
216
217        dot_dir.push(PRISTINE_DIR);
218        debug!("dot_dir = {:?}", dot_dir);
219        match libpijul::pristine::sanakirja::Pristine::new(&dot_dir.join("db")) {
220            Ok(pristine) => {
221                debug!("pristine done");
222                return Ok(RemoteRepo::Local(Local {
223                    root: Path::new(name).to_path_buf(),
224                    channel: channel.to_string(),
225                    changes_dir,
226                    pristine: Arc::new(pristine),
227                    name: name.to_string(),
228                }));
229            }
230            Err(libpijul::pristine::sanakirja::SanakirjaError::Sanakirja(
231                sanakirja::Error::IO(e),
232            )) if e.kind() == std::io::ErrorKind::NotFound => {
233                debug!("repo not found")
234            }
235            Err(e) => return Err(e.into()),
236        }
237    }
238    if let Some(mut ssh) = ssh_remote(user, name, with_path) {
239        debug!("unknown_remote, ssh = {:?}", ssh);
240        if let Some(c) = ssh.connect(name, channel).await? {
241            return Ok(RemoteRepo::Ssh(c));
242        }
243    }
244    bail!("Remote not found: {:?}", name)
245}
246
247// Extracting this saves a little bit of duplication.
248pub fn get_local_inodes(
249    txn: &mut MutTxn<()>,
250    channel: &ChannelRef<MutTxn<()>>,
251    repo: &Repository,
252    path: &[String],
253) -> Result<HashSet<Position<ChangeId>>, anyhow::Error> {
254    let mut paths = HashSet::new();
255    for path in path.iter() {
256        let (p, ambiguous) = txn.follow_oldest_path(&repo.changes, &channel, path)?;
257        if ambiguous {
258            bail!("Ambiguous path: {:?}", path)
259        }
260        paths.insert(p);
261        paths.extend(
262            libpijul::fs::iter_graph_descendants(txn, &channel.read(), p)?.map(|x| x.unwrap()),
263        );
264    }
265    Ok(paths)
266}
267
268/// Embellished [`RemoteDelta`] that has information specific
269/// to a push operation. We want to know what our options are
270/// for changes to upload, whether the remote has unrecorded relevant changes,
271/// and whether the remote has changes we don't know about, since those might
272/// effect whether or not we actually want to go through with the push.
273pub struct PushDelta {
274    pub to_upload: Vec<CS>,
275    pub remote_unrecs: Vec<(u64, CS)>,
276    pub unknown_changes: Vec<CS>,
277}
278
279/// For a [`RemoteRepo`] that's Local, Ssh, or Http
280/// (anything other than a LocalChannel),
281/// [`RemoteDelta`] contains data about the difference between
282/// the "actual" state of the remote ('theirs') and the last version of it
283/// that we cached ('ours'). The dichotomy is the last point at which the two
284/// were the same. `remote_unrecs` is a list of changes which used to be
285/// present in the remote, AND were present in the current channel we're
286/// pulling to or pushing from. The significance of that is that if we knew
287/// about a certain change but did not pull it, the user won't be notified
288/// if it's unrecorded in the remote.
289///
290/// If the remote we're pulling from or pushing to is a LocalChannel,
291/// (meaning it's just a different channel of the repo we're already in), then
292/// `ours_ge_dichotomy`, `theirs_ge_dichotomy`, and `remote_unrecs` will be empty
293/// since they have no meaning. If we're pulling from a LocalChannel,
294/// there's no cache to have diverged from, and if we're pushing to a LocalChannel,
295/// we're not going to suddenly be surprised by the presence of unknown changes.
296///
297/// This struct will be created by both a push and pull operation since both
298/// need to update the changelist and will at least try to update the local
299/// remote cache. For a push, this later gets turned into [`PushDelta`].
300pub struct RemoteDelta<T: MutTxnTExt + TxnTExt> {
301    pub inodes: HashSet<Position<Hash>>,
302    pub to_download: Vec<CS>,
303    pub remote_ref: Option<RemoteRef<T>>,
304    pub ours_ge_dichotomy_set: HashSet<CS>,
305    pub theirs_ge_dichotomy_set: HashSet<CS>,
306    // Keep the Vec representation around as well so that notification
307    // for unknown changes during shows the hashes in order.
308    pub theirs_ge_dichotomy: Vec<(u64, Hash, Merkle, bool)>,
309    pub remote_unrecs: Vec<(u64, CS)>,
310}
311
312impl RemoteDelta<MutTxn<()>> {
313    /// Make a [`PushDelta`] from a [`RemoteDelta`]
314    /// when the remote is a [`RemoteRepo::LocalChannel`].
315    pub fn to_local_channel_push(
316        self,
317        remote_channel: &str,
318        txn: &mut MutTxn<()>,
319        path: &[String],
320        channel: &ChannelRef<MutTxn<()>>,
321        repo: &Repository,
322    ) -> Result<PushDelta, anyhow::Error> {
323        let mut to_upload = Vec::new();
324        let inodes = get_local_inodes(txn, channel, repo, path)?;
325
326        for x in txn.reverse_log(&*channel.read(), None)? {
327            let (_, (h, _)) = x?;
328            if let Some(channel) = txn.load_channel(remote_channel)? {
329                let channel = channel.read();
330                let h_int = txn.get_internal(h)?.unwrap();
331                if txn.get_changeset(txn.changes(&channel), h_int)?.is_none() {
332                    if inodes.is_empty() {
333                        to_upload.push(CS::Change(h.into()))
334                    } else {
335                        for p in inodes.iter() {
336                            if txn.get_touched_files(p, Some(h_int))?.is_some() {
337                                to_upload.push(CS::Change(h.into()));
338                                break;
339                            }
340                        }
341                    }
342                }
343            }
344        }
345        assert!(self.ours_ge_dichotomy_set.is_empty());
346        assert!(self.theirs_ge_dichotomy_set.is_empty());
347        let d = PushDelta {
348            to_upload: to_upload.into_iter().rev().collect(),
349            remote_unrecs: self.remote_unrecs,
350            unknown_changes: Vec::new(),
351        };
352        assert!(d.remote_unrecs.is_empty());
353        Ok(d)
354    }
355
356    /// Make a [`PushDelta`] from a [`RemoteDelta`] when the remote
357    /// is not a LocalChannel.
358    pub fn to_remote_push(
359        self,
360        txn: &mut MutTxn<()>,
361        path: &[String],
362        channel: &ChannelRef<MutTxn<()>>,
363        repo: &Repository,
364    ) -> Result<PushDelta, anyhow::Error> {
365        let mut to_upload = Vec::new();
366        let inodes = get_local_inodes(txn, channel, repo, path)?;
367        if let Some(ref remote_ref) = self.remote_ref {
368            let mut tags: HashSet<Merkle> = HashSet::new();
369            for x in txn.rev_iter_tags(&channel.read().tags, None)? {
370                let (n, m) = x?;
371                debug!("rev_iter_tags {:?} {:?}", n, m);
372                // First, if the remote has exactly the same first n tags, break.
373                if let Some((_, p)) = txn.get_remote_tag(&remote_ref.lock().tags, (*n).into())? {
374                    if p.b == m.b {
375                        debug!("the remote has tag {:?}", p.a);
376                        break;
377                    }
378                    if p.a != m.a {
379                        // What to do here?  It is possible that state
380                        // `n` is a different state than `m.a` in the
381                        // remote, and is also tagged.
382                    }
383                } else {
384                    tags.insert(m.a.into());
385                }
386            }
387            debug!("tags = {:?}", tags);
388            for x in txn.reverse_log(&*channel.read(), None)? {
389                let (_, (h, m)) = x?;
390                let h_unrecorded = self
391                    .remote_unrecs
392                    .iter()
393                    .any(|(_, hh)| hh == &CS::Change(h.into()));
394                if !h_unrecorded {
395                    if txn.remote_has_state(remote_ref, &m)?.is_some() {
396                        debug!("remote_has_state: {:?}", m);
397                        break;
398                    }
399                }
400                let h_int = txn.get_internal(h)?.unwrap();
401                let h_deser = Hash::from(h);
402                // For elements that are in the uncached remote changes (theirs_ge_dichotomy),
403                // don't put those in to_upload since the remote we're pushing to
404                // already has those changes.
405                if (!txn.remote_has_change(remote_ref, &h)? || h_unrecorded)
406                    && !self.theirs_ge_dichotomy_set.contains(&CS::Change(h_deser))
407                {
408                    if inodes.is_empty() {
409                        if tags.remove(&m.into()) {
410                            to_upload.push(CS::State(m.into()));
411                        }
412                        to_upload.push(CS::Change(h_deser));
413                    } else {
414                        for p in inodes.iter() {
415                            if txn.get_touched_files(p, Some(h_int))?.is_some() {
416                                to_upload.push(CS::Change(h_deser));
417                                if tags.remove(&m.into()) {
418                                    to_upload.push(CS::State(m.into()));
419                                }
420                                break;
421                            }
422                        }
423                    }
424                }
425            }
426            for t in tags.iter() {
427                if let Some(n) = txn.remote_has_state(&remote_ref, &t.into())? {
428                    if !txn.is_tagged(&remote_ref.lock().tags, n)? {
429                        to_upload.push(CS::State(*t));
430                    }
431                } else {
432                    debug!("the remote doesn't have state {:?}", t);
433                }
434            }
435        }
436
437        // { h | h \in theirs_ge_dichotomy /\ ~(h \in ours_ge_dichotomy) }
438        // The set of their changes >= dichotomy that aren't
439        // already known to our set of changes after the dichotomy.
440        let mut unknown_changes = Vec::new();
441        for (_, h, m, is_tag) in self.theirs_ge_dichotomy.iter() {
442            let h_is_known = txn.get_revchanges(&channel, h).unwrap().is_some();
443            let change = CS::Change(*h);
444            if !(self.ours_ge_dichotomy_set.contains(&change) || h_is_known) {
445                unknown_changes.push(change)
446            }
447            if *is_tag {
448                let m_is_known = if let Some(n) = txn
449                    .channel_has_state(txn.states(&*channel.read()), &m.into())
450                    .unwrap()
451                {
452                    txn.is_tagged(txn.tags(&*channel.read()), n.into()).unwrap()
453                } else {
454                    false
455                };
456                if !m_is_known {
457                    unknown_changes.push(CS::State(*m))
458                }
459            }
460        }
461
462        Ok(PushDelta {
463            to_upload: to_upload.into_iter().rev().collect(),
464            remote_unrecs: self.remote_unrecs,
465            unknown_changes,
466        })
467    }
468}
469
470/// Create a [`RemoteDelta`] for a [`RemoteRepo::LocalChannel`].
471/// Since this case doesn't have a local remote cache to worry about,
472/// mainly just calculates the `to_download` list of changes.
473pub fn update_changelist_local_channel(
474    remote_channel: &str,
475    txn: &mut MutTxn<()>,
476    path: &[String],
477    current_channel: &ChannelRef<MutTxn<()>>,
478    repo: &Repository,
479    specific_changes: &[String],
480) -> Result<RemoteDelta<MutTxn<()>>, anyhow::Error> {
481    if !specific_changes.is_empty() {
482        let mut to_download = Vec::new();
483        for h in specific_changes {
484            let h = txn.hash_from_prefix(h)?.0;
485            if txn.get_revchanges(current_channel, &h)?.is_none() {
486                to_download.push(CS::Change(h));
487            }
488        }
489        Ok(RemoteDelta {
490            inodes: HashSet::new(),
491            to_download,
492            remote_ref: None,
493            ours_ge_dichotomy_set: HashSet::new(),
494            theirs_ge_dichotomy: Vec::new(),
495            theirs_ge_dichotomy_set: HashSet::new(),
496            remote_unrecs: Vec::new(),
497        })
498    } else {
499        let mut inodes = HashSet::new();
500        let inodes_ = get_local_inodes(txn, current_channel, repo, path)?;
501        let mut to_download = Vec::new();
502        inodes.extend(inodes_.iter().map(|x| libpijul::pristine::Position {
503            change: txn.get_external(&x.change).unwrap().unwrap().into(),
504            pos: x.pos,
505        }));
506        if let Some(remote_channel) = txn.load_channel(remote_channel)? {
507            let remote_channel = remote_channel.read();
508            for x in txn.reverse_log(&remote_channel, None)? {
509                let (_, (h, m)) = x?;
510                if txn
511                    .channel_has_state(txn.states(&*current_channel.read()), &m)?
512                    .is_some()
513                {
514                    break;
515                }
516                let h_int = txn.get_internal(h)?.unwrap();
517                if txn
518                    .get_changeset(txn.changes(&*current_channel.read()), h_int)?
519                    .is_none()
520                {
521                    if inodes_.is_empty()
522                        || inodes_.iter().any(|&inode| {
523                            txn.get_rev_touched_files(h_int, Some(&inode))
524                                .unwrap()
525                                .is_some()
526                        })
527                    {
528                        to_download.push(CS::Change(h.into()));
529                    }
530                }
531            }
532        }
533        Ok(RemoteDelta {
534            inodes,
535            to_download,
536            remote_ref: None,
537            ours_ge_dichotomy_set: HashSet::new(),
538            theirs_ge_dichotomy: Vec::new(),
539            theirs_ge_dichotomy_set: HashSet::new(),
540            remote_unrecs: Vec::new(),
541        })
542    }
543}
544
545impl RemoteRepo {
546    fn name(&self) -> Option<&str> {
547        match *self {
548            RemoteRepo::Ssh(ref s) => Some(s.name.as_str()),
549            RemoteRepo::Local(ref l) => Some(l.name.as_str()),
550            RemoteRepo::Http(ref h) => Some(h.name.as_str()),
551            RemoteRepo::LocalChannel(_) => None,
552            RemoteRepo::None => unreachable!(),
553        }
554    }
555
556    pub fn repo_name(&self) -> Result<Option<String>, anyhow::Error> {
557        match *self {
558            RemoteRepo::Ssh(ref s) => {
559                if let Some(sep) = s.name.rfind(|c| c == ':' || c == '/') {
560                    Ok(Some(s.name.split_at(sep + 1).1.to_string()))
561                } else {
562                    Ok(Some(s.name.as_str().to_string()))
563                }
564            }
565            RemoteRepo::Local(ref l) => {
566                if let Some(file) = l.root.file_name() {
567                    Ok(Some(
568                        file.to_str()
569                            .context("failed to decode local repository name")?
570                            .to_string(),
571                    ))
572                } else {
573                    Ok(None)
574                }
575            }
576            RemoteRepo::Http(ref h) => {
577                if let Some(name) = libpijul::path::file_name(h.url.path()) {
578                    if !name.trim().is_empty() {
579                        return Ok(Some(name.trim().to_string()));
580                    }
581                }
582                Ok(h.url.host().map(|h| h.to_string()))
583            }
584            RemoteRepo::LocalChannel(_) => Ok(None),
585            RemoteRepo::None => unreachable!(),
586        }
587    }
588
589    pub async fn finish(&mut self) -> Result<(), anyhow::Error> {
590        if let RemoteRepo::Ssh(s) = self {
591            s.finish().await?
592        }
593        Ok(())
594    }
595
596    pub async fn update_changelist<T: MutTxnTExt + TxnTExt + 'static>(
597        &mut self,
598        txn: &mut T,
599        path: &[String],
600    ) -> Result<Option<(HashSet<Position<Hash>>, RemoteRef<T>)>, anyhow::Error> {
601        debug!("update_changelist");
602        let id = if let Some(id) = self.get_id(txn).await? {
603            id
604        } else {
605            return Ok(None);
606        };
607        let mut remote = if let Some(name) = self.name() {
608            txn.open_or_create_remote(id, name)?
609        } else {
610            return Ok(None);
611        };
612        let n = self.dichotomy_changelist(txn, &remote.lock()).await?;
613        debug!("update changelist {:?}", n);
614        let v: Vec<_> = txn
615            .iter_remote(&remote.lock().remote, n)?
616            .filter_map(|k| {
617                debug!("filter_map {:?}", k);
618                let k = (*k.unwrap().0).into();
619                if k >= n {
620                    Some(k)
621                } else {
622                    None
623                }
624            })
625            .collect();
626        for k in v {
627            debug!("deleting {:?}", k);
628            txn.del_remote(&mut remote, k)?;
629        }
630        let v: Vec<_> = txn
631            .iter_tags(&remote.lock().tags, n)?
632            .filter_map(|k| {
633                debug!("filter_map {:?}", k);
634                let k = (*k.unwrap().0).into();
635                if k >= n {
636                    Some(k)
637                } else {
638                    None
639                }
640            })
641            .collect();
642        for k in v {
643            debug!("deleting {:?}", k);
644            txn.del_tags(&mut remote.lock().tags, k)?;
645        }
646
647        debug!("deleted");
648        let paths = self.download_changelist(txn, &mut remote, n, path).await?;
649        Ok(Some((paths, remote)))
650    }
651
652    async fn update_changelist_pushpull_from_scratch(
653        &mut self,
654        txn: &mut MutTxn<()>,
655        path: &[String],
656        current_channel: &ChannelRef<MutTxn<()>>,
657    ) -> Result<RemoteDelta<MutTxn<()>>, anyhow::Error> {
658        debug!("no id, starting from scratch");
659        let (inodes, theirs_ge_dichotomy) = self.download_changelist_nocache(0, path).await?;
660        let mut theirs_ge_dichotomy_set = HashSet::new();
661        let mut to_download = Vec::new();
662        for (_, h, m, is_tag) in theirs_ge_dichotomy.iter() {
663            theirs_ge_dichotomy_set.insert(CS::Change(*h));
664            if txn.get_revchanges(current_channel, h)?.is_none() {
665                to_download.push(CS::Change(*h));
666            }
667            if *is_tag {
668                let ch = current_channel.read();
669                if let Some(n) = txn.channel_has_state(txn.states(&*ch), &m.into())? {
670                    if !txn.is_tagged(txn.tags(&*ch), n.into())? {
671                        to_download.push(CS::State(*m));
672                    }
673                } else {
674                    to_download.push(CS::State(*m));
675                }
676            }
677        }
678        Ok(RemoteDelta {
679            inodes,
680            remote_ref: None,
681            to_download,
682            ours_ge_dichotomy_set: HashSet::new(),
683            theirs_ge_dichotomy,
684            theirs_ge_dichotomy_set,
685            remote_unrecs: Vec::new(),
686        })
687    }
688
689    /// Creates a [`RemoteDelta`].
690    ///
691    /// IF:
692    ///    the RemoteRepo is a [`RemoteRepo::LocalChannel`], delegate to
693    ///    the simpler method [`update_changelist_local_channel`], returning the
694    ///    `to_download` list of changes.
695    ///
696    /// ELSE:
697    ///    calculate the `to_download` list of changes. Additionally, if there are
698    ///    no remote unrecords, update the local remote cache. If there are remote unrecords,
699    ///    calculate and return information about the difference between our cached version
700    ///    of the remote, and their version of the remote.
701    pub async fn update_changelist_pushpull(
702        &mut self,
703        txn: &mut MutTxn<()>,
704        path: &[String],
705        current_channel: &ChannelRef<MutTxn<()>>,
706        force_cache: Option<bool>,
707        repo: &Repository,
708        specific_changes: &[String],
709        is_pull: bool,
710    ) -> Result<RemoteDelta<MutTxn<()>>, anyhow::Error> {
711        debug!("update_changelist_pushpull");
712        if let RemoteRepo::LocalChannel(c) = self {
713            return update_changelist_local_channel(
714                c,
715                txn,
716                path,
717                current_channel,
718                repo,
719                specific_changes,
720            );
721        }
722
723        let id = if let Some(id) = self.get_id(txn).await? {
724            debug!("id = {:?}", id);
725            id
726        } else {
727            return self
728                .update_changelist_pushpull_from_scratch(txn, path, current_channel)
729                .await;
730        };
731        let mut remote_ref = txn.open_or_create_remote(id, self.name().unwrap()).unwrap();
732        let dichotomy_n = self.dichotomy_changelist(txn, &remote_ref.lock()).await?;
733        let ours_ge_dichotomy: Vec<(u64, CS)> = txn
734            .iter_remote(&remote_ref.lock().remote, dichotomy_n)?
735            .filter_map(|k| {
736                debug!("filter_map {:?}", k);
737                match k.unwrap() {
738                    (k, libpijul::pristine::Pair { a: hash, .. }) => {
739                        let (k, hash) = (u64::from(*k), Hash::from(*hash));
740                        if k >= dichotomy_n {
741                            Some((k, CS::Change(hash)))
742                        } else {
743                            None
744                        }
745                    }
746                }
747            })
748            .collect();
749        let (inodes, theirs_ge_dichotomy) =
750            self.download_changelist_nocache(dichotomy_n, path).await?;
751        debug!("theirs_ge_dichotomy = {:?}", theirs_ge_dichotomy);
752        let ours_ge_dichotomy_set = ours_ge_dichotomy
753            .iter()
754            .map(|(_, h)| h)
755            .copied()
756            .collect::<HashSet<CS>>();
757        let mut theirs_ge_dichotomy_set = HashSet::new();
758        for (_, h, m, is_tag) in theirs_ge_dichotomy.iter() {
759            theirs_ge_dichotomy_set.insert(CS::Change(*h));
760            if *is_tag {
761                theirs_ge_dichotomy_set.insert(CS::State(*m));
762            }
763        }
764
765        // remote_unrecs = {x: (u64, Hash) | x \in ours_ge_dichot /\ ~(x \in theirs_ge_dichot) /\ x \in current_channel }
766        let remote_unrecs = remote_unrecs(
767            txn,
768            current_channel,
769            &ours_ge_dichotomy,
770            &theirs_ge_dichotomy_set,
771        )?;
772        let should_cache = if let Some(true) = force_cache {
773            true
774        } else {
775            remote_unrecs.is_empty()
776        };
777        debug!(
778            "should_cache = {:?} {:?} {:?}",
779            force_cache, remote_unrecs, should_cache
780        );
781        if should_cache {
782            use libpijul::ChannelMutTxnT;
783            for (k, t) in ours_ge_dichotomy.iter().copied() {
784                match t {
785                    CS::State(_) => txn.del_tags(&mut remote_ref.lock().tags, k)?,
786                    CS::Change(_) => {
787                        txn.del_remote(&mut remote_ref, k)?;
788                    }
789                }
790            }
791            for (n, h, m, is_tag) in theirs_ge_dichotomy.iter().copied() {
792                debug!("theirs: {:?} {:?} {:?}", n, h, m);
793                txn.put_remote(&mut remote_ref, n, (h, m))?;
794                if is_tag {
795                    txn.put_tags(&mut remote_ref.lock().tags, n, &m)?;
796                }
797            }
798        }
799        if !specific_changes.is_empty() {
800            // Here, the user only wanted to push/pull specific changes
801            let to_download = specific_changes
802                .iter()
803                .map(|h| {
804                    if is_pull {
805                        {
806                            if let Ok(t) = txn.state_from_prefix(&remote_ref.lock().states, h) {
807                                return Ok(CS::State(t.0));
808                            }
809                        }
810                        Ok(CS::Change(txn.hash_from_prefix_remote(&remote_ref, h)?))
811                    } else {
812                        if let Ok(t) = txn.state_from_prefix(&current_channel.read().states, h) {
813                            Ok(CS::State(t.0))
814                        } else {
815                            Ok(CS::Change(txn.hash_from_prefix(h)?.0))
816                        }
817                    }
818                })
819                .collect::<Result<Vec<_>, anyhow::Error>>();
820            Ok(RemoteDelta {
821                inodes,
822                remote_ref: Some(remote_ref),
823                to_download: to_download?,
824                ours_ge_dichotomy_set,
825                theirs_ge_dichotomy,
826                theirs_ge_dichotomy_set,
827                remote_unrecs,
828            })
829        } else {
830            let mut to_download: Vec<CS> = Vec::new();
831            let mut to_download_ = HashSet::new();
832            for x in txn.iter_rev_remote(&remote_ref.lock().remote, None)? {
833                let (_, p) = x?;
834                let h: Hash = p.a.into();
835                if txn
836                    .channel_has_state(txn.states(&current_channel.read()), &p.b)
837                    .unwrap()
838                    .is_some()
839                {
840                    break;
841                }
842                if txn.get_revchanges(&current_channel, &h).unwrap().is_none() {
843                    let h = CS::Change(h);
844                    if to_download_.insert(h.clone()) {
845                        to_download.push(h);
846                    }
847                }
848            }
849
850            // The patches in theirs_ge_dichotomy are unknown to us,
851            // download them.
852            for (n, h, m, is_tag) in theirs_ge_dichotomy.iter() {
853                debug!(
854                    "update_changelist_pushpull line {}, {:?} {:?}",
855                    line!(),
856                    n,
857                    h
858                );
859                // In all cases, add this new change/state/tag to `to_download`.
860                let ch = CS::Change(*h);
861                if txn.get_revchanges(&current_channel, h).unwrap().is_none() {
862                    if to_download_.insert(ch.clone()) {
863                        to_download.push(ch.clone());
864                    }
865                    if *is_tag {
866                        to_download.push(CS::State(*m));
867                    }
868                } else if *is_tag {
869                    let has_tag = if let Some(n) =
870                        txn.channel_has_state(txn.states(&current_channel.read()), &m.into())?
871                    {
872                        txn.is_tagged(txn.tags(&current_channel.read()), n.into())?
873                    } else {
874                        false
875                    };
876                    if !has_tag {
877                        to_download.push(CS::State(*m));
878                    }
879                }
880                // Additionally, if there are no remote unrecords
881                // (i.e. if `should_cache`), cache.
882                if should_cache && ours_ge_dichotomy_set.get(&ch).is_none() {
883                    use libpijul::ChannelMutTxnT;
884                    txn.put_remote(&mut remote_ref, *n, (*h, *m))?;
885                    if *is_tag {
886                        let mut rem = remote_ref.lock();
887                        txn.put_tags(&mut rem.tags, *n, m)?;
888                    }
889                }
890            }
891            Ok(RemoteDelta {
892                inodes,
893                remote_ref: Some(remote_ref),
894                to_download,
895                ours_ge_dichotomy_set,
896                theirs_ge_dichotomy,
897                theirs_ge_dichotomy_set,
898                remote_unrecs,
899            })
900        }
901    }
902
903    /// Get the list of the remote's changes that come after `from: u64`.
904    /// Instead of immediately updating the local cache of the remote, return
905    /// the change info without changing the cache.
906    pub async fn download_changelist_nocache(
907        &mut self,
908        from: u64,
909        paths: &[String],
910    ) -> Result<(HashSet<Position<Hash>>, Vec<(u64, Hash, Merkle, bool)>), anyhow::Error> {
911        let mut v = Vec::new();
912        let f = |v: &mut Vec<(u64, Hash, Merkle, bool)>, n, h, m, m2| {
913            debug!("no cache: {:?}", h);
914            Ok(v.push((n, h, m, m2)))
915        };
916        let r = match *self {
917            RemoteRepo::Local(ref mut l) => l.download_changelist(f, &mut v, from, paths)?,
918            RemoteRepo::Ssh(ref mut s) => s.download_changelist(f, &mut v, from, paths).await?,
919            RemoteRepo::Http(ref h) => h.download_changelist(f, &mut v, from, paths).await?,
920            RemoteRepo::LocalChannel(_) => HashSet::new(),
921            RemoteRepo::None => unreachable!(),
922        };
923        Ok((r, v))
924    }
925
926    /// Uses a binary search to find the integer identifier of the last point
927    /// at which our locally cached version of the remote was the same as the 'actual'
928    /// state of the remote.
929    async fn dichotomy_changelist<T: MutTxnT + TxnTExt>(
930        &mut self,
931        txn: &T,
932        remote: &libpijul::pristine::Remote<T>,
933    ) -> Result<u64, anyhow::Error> {
934        let mut a = 0;
935        let (mut b, state): (_, Merkle) = if let Some((u, v)) = txn.last_remote(&remote.remote)? {
936            debug!("dichotomy_changelist: {:?} {:?}", u, v);
937            (u, (&v.b).into())
938        } else {
939            debug!("the local copy of the remote has no changes");
940            return Ok(0);
941        };
942        let last_statet = if let Some((_, _, v)) = txn.last_remote_tag(&remote.tags)? {
943            v.into()
944        } else {
945            Merkle::zero()
946        };
947        debug!("last_state: {:?} {:?}", state, last_statet);
948        if let Some((_, s, st)) = self.get_state(txn, Some(b)).await? {
949            debug!("remote last_state: {:?} {:?}", s, st);
950            if s == state && st == last_statet {
951                // The local list is already up to date.
952                return Ok(b + 1);
953            }
954        }
955        // Else, find the last state we have in common with the
956        // remote, it might be older than the last known state (if
957        // changes were unrecorded on the remote).
958        while a < b {
959            let mid = (a + b) / 2;
960            let (mid, state) = {
961                let (a, b) = txn.get_remote_state(&remote.remote, mid)?.unwrap();
962                (a, b.b)
963            };
964            let statet = if let Some((_, b)) = txn.get_remote_tag(&remote.tags, mid)? {
965                // There's still a tag at position >= mid in the
966                // sequence.
967                b.b.into()
968            } else {
969                // No tag at or after mid, the last state, `statet`,
970                // is the right answer in that case.
971                last_statet
972            };
973
974            let remote_state = self.get_state(txn, Some(mid)).await?;
975            debug!("dichotomy {:?} {:?} {:?}", mid, state, remote_state);
976            if let Some((_, remote_state, remote_statet)) = remote_state {
977                if remote_state == state && remote_statet == statet {
978                    if a == mid {
979                        return Ok(a + 1);
980                    } else {
981                        a = mid;
982                        continue;
983                    }
984                }
985            }
986            if b == mid {
987                break;
988            } else {
989                b = mid
990            }
991        }
992        Ok(a)
993    }
994
995    async fn get_state<T: libpijul::TxnTExt>(
996        &mut self,
997        txn: &T,
998        mid: Option<u64>,
999    ) -> Result<Option<(u64, Merkle, Merkle)>, anyhow::Error> {
1000        match *self {
1001            RemoteRepo::Local(ref mut l) => l.get_state(mid),
1002            RemoteRepo::Ssh(ref mut s) => s.get_state(mid).await,
1003            RemoteRepo::Http(ref mut h) => h.get_state(mid).await,
1004            RemoteRepo::LocalChannel(ref channel) => {
1005                if let Some(channel) = txn.load_channel(&channel)? {
1006                    local::get_state(txn, &channel, mid)
1007                } else {
1008                    Ok(None)
1009                }
1010            }
1011            RemoteRepo::None => unreachable!(),
1012        }
1013    }
1014
1015    /// This method might return `Ok(None)` in some cases, for example
1016    /// if the remote wants to indicate not to store a cache. This is
1017    /// the case for Nest channels, for example.
1018    async fn get_id<T: libpijul::TxnTExt + 'static>(
1019        &mut self,
1020        txn: &T,
1021    ) -> Result<Option<libpijul::pristine::RemoteId>, anyhow::Error> {
1022        match *self {
1023            RemoteRepo::Local(ref l) => Ok(Some(l.get_id()?)),
1024            RemoteRepo::Ssh(ref mut s) => s.get_id().await,
1025            RemoteRepo::Http(ref h) => h.get_id().await,
1026            RemoteRepo::LocalChannel(ref channel) => {
1027                if let Some(channel) = txn.load_channel(&channel)? {
1028                    Ok(txn.id(&*channel.read()).cloned())
1029                } else {
1030                    Err(anyhow::anyhow!(
1031                        "Unable to retrieve RemoteId for LocalChannel remote"
1032                    ))
1033                }
1034            }
1035            RemoteRepo::None => unreachable!(),
1036        }
1037    }
1038
1039    pub async fn archive<W: std::io::Write + Send + 'static>(
1040        &mut self,
1041        prefix: Option<String>,
1042        state: Option<(Merkle, &[Hash])>,
1043        umask: u16,
1044        w: W,
1045    ) -> Result<u64, anyhow::Error> {
1046        match *self {
1047            RemoteRepo::Local(ref mut l) => {
1048                debug!("archiving local repo");
1049                let changes = libpijul::changestore::filesystem::FileSystem::from_root(
1050                    &l.root,
1051                    pijul_repository::max_files()?,
1052                );
1053                let mut tarball = libpijul::output::Tarball::new(w, prefix, umask);
1054                let conflicts = if let Some((state, extra)) = state {
1055                    let txn = l.pristine.arc_txn_begin()?;
1056                    let channel = {
1057                        let txn = txn.read();
1058                        txn.load_channel(&l.channel)?.unwrap()
1059                    };
1060                    txn.archive_with_state(&changes, &channel, &state, extra, &mut tarball, 0)?
1061                } else {
1062                    let txn = l.pristine.arc_txn_begin()?;
1063                    let channel = {
1064                        let txn = txn.read();
1065                        txn.load_channel(&l.channel)?.unwrap()
1066                    };
1067                    txn.archive(&changes, &channel, &mut tarball)?
1068                };
1069                Ok(conflicts.len() as u64)
1070            }
1071            RemoteRepo::Ssh(ref mut s) => s.archive(prefix, state, w).await,
1072            RemoteRepo::Http(ref mut h) => h.archive(prefix, state, w).await,
1073            RemoteRepo::LocalChannel(_) => unreachable!(),
1074            RemoteRepo::None => unreachable!(),
1075        }
1076    }
1077
1078    async fn download_changelist<T: MutTxnTExt>(
1079        &mut self,
1080        txn: &mut T,
1081        remote: &mut RemoteRef<T>,
1082        from: u64,
1083        paths: &[String],
1084    ) -> Result<HashSet<Position<Hash>>, anyhow::Error> {
1085        let f = |a: &mut (&mut T, &mut RemoteRef<T>), n, h, m, is_tag| {
1086            let (ref mut txn, ref mut remote) = *a;
1087            txn.put_remote(remote, n, (h, m))?;
1088            if is_tag {
1089                txn.put_tags(&mut remote.lock().tags, n, &m.into())?;
1090            }
1091            Ok(())
1092        };
1093        match *self {
1094            RemoteRepo::Local(ref mut l) => {
1095                l.download_changelist(f, &mut (txn, remote), from, paths)
1096            }
1097            RemoteRepo::Ssh(ref mut s) => {
1098                s.download_changelist(f, &mut (txn, remote), from, paths)
1099                    .await
1100            }
1101            RemoteRepo::Http(ref h) => {
1102                h.download_changelist(f, &mut (txn, remote), from, paths)
1103                    .await
1104            }
1105            RemoteRepo::LocalChannel(_) => Ok(HashSet::new()),
1106            RemoteRepo::None => unreachable!(),
1107        }
1108    }
1109
1110    pub async fn upload_changes<T: MutTxnTExt + 'static>(
1111        &mut self,
1112        txn: &mut T,
1113        local: PathBuf,
1114        to_channel: Option<&str>,
1115        changes: &[CS],
1116    ) -> Result<(), anyhow::Error> {
1117        let upload_bar = ProgressBar::new(changes.len() as u64, UPLOAD_MESSAGE)?;
1118
1119        match self {
1120            RemoteRepo::Local(ref mut l) => {
1121                l.upload_changes(upload_bar, local, to_channel, changes)?
1122            }
1123            RemoteRepo::Ssh(ref mut s) => {
1124                s.upload_changes(upload_bar, local, to_channel, changes)
1125                    .await?
1126            }
1127            RemoteRepo::Http(ref h) => {
1128                h.upload_changes(upload_bar, local, to_channel, changes)
1129                    .await?
1130            }
1131            RemoteRepo::LocalChannel(ref channel) => {
1132                let mut channel = txn.open_or_create_channel(channel)?;
1133                let store = libpijul::changestore::filesystem::FileSystem::from_changes(
1134                    local,
1135                    pijul_repository::max_files()?,
1136                );
1137                local::upload_changes(upload_bar, &store, txn, &mut channel, changes)?
1138            }
1139            RemoteRepo::None => unreachable!(),
1140        }
1141        Ok(())
1142    }
1143
1144    /// Start (and possibly complete) the download of a change.
1145    pub async fn download_changes(
1146        &mut self,
1147        progress_bar: ProgressBar,
1148        hashes: &mut tokio::sync::mpsc::UnboundedReceiver<CS>,
1149        send: &mut tokio::sync::mpsc::Sender<(CS, bool)>,
1150        path: &mut PathBuf,
1151        full: bool,
1152    ) -> Result<bool, anyhow::Error> {
1153        debug!("download_changes");
1154        match *self {
1155            RemoteRepo::Local(ref mut l) => {
1156                l.download_changes(progress_bar, hashes, send, path).await?
1157            }
1158            RemoteRepo::Ssh(ref mut s) => {
1159                s.download_changes(progress_bar, hashes, send, path, full)
1160                    .await?
1161            }
1162            RemoteRepo::Http(ref mut h) => {
1163                h.download_changes(progress_bar, hashes, send, path, full)
1164                    .await?
1165            }
1166            RemoteRepo::LocalChannel(_) => {
1167                while let Some(c) = hashes.recv().await {
1168                    send.send((c, true)).await?;
1169                }
1170            }
1171            RemoteRepo::None => unreachable!(),
1172        }
1173        Ok(true)
1174    }
1175
1176    pub async fn update_identities<T: MutTxnTExt + TxnTExt + GraphIter>(
1177        &mut self,
1178        repo: &mut Repository,
1179        remote: &RemoteRef<T>,
1180    ) -> Result<(), anyhow::Error> {
1181        debug!("Downloading identities");
1182        let mut id_path = repo.path.clone();
1183        id_path.push(DOT_DIR);
1184        id_path.push("identities");
1185        let rev = None;
1186        let r = match *self {
1187            RemoteRepo::Local(ref mut l) => l.update_identities(rev, id_path).await?,
1188            RemoteRepo::Ssh(ref mut s) => s.update_identities(rev, id_path).await?,
1189            RemoteRepo::Http(ref mut h) => h.update_identities(rev, id_path).await?,
1190            RemoteRepo::LocalChannel(_) => 0,
1191            RemoteRepo::None => unreachable!(),
1192        };
1193        remote.set_id_revision(r);
1194        Ok(())
1195    }
1196
1197    pub async fn prove(&mut self, key: libpijul::key::SKey) -> Result<(), anyhow::Error> {
1198        match *self {
1199            RemoteRepo::Ssh(ref mut s) => s.prove(key).await,
1200            RemoteRepo::Http(ref mut h) => h.prove(key).await,
1201            RemoteRepo::None => unreachable!(),
1202            _ => Ok(()),
1203        }
1204    }
1205
1206    pub async fn pull<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
1207        &mut self,
1208        repo: &mut Repository,
1209        txn: &mut T,
1210        channel: &mut ChannelRef<T>,
1211        to_apply: &[CS],
1212        inodes: &HashSet<Position<Hash>>,
1213        do_apply: bool,
1214    ) -> Result<Vec<CS>, anyhow::Error> {
1215        let apply_len = to_apply.len() as u64;
1216        let download_bar = ProgressBar::new(apply_len, DOWNLOAD_MESSAGE)?;
1217        let apply_bar = if do_apply {
1218            Some(ProgressBar::new(apply_len, APPLY_MESSAGE)?)
1219        } else {
1220            None
1221        };
1222
1223        let (mut send, recv) = tokio::sync::mpsc::channel(100);
1224
1225        let mut self_ = std::mem::replace(self, RemoteRepo::None);
1226        let (hash_send, mut hash_recv) = tokio::sync::mpsc::unbounded_channel();
1227        let mut change_path_ = repo.path.clone();
1228        change_path_.push(DOT_DIR);
1229        change_path_.push("changes");
1230        let cloned_download_bar = download_bar.clone();
1231        let t = tokio::spawn(async move {
1232            self_
1233                .download_changes(
1234                    cloned_download_bar,
1235                    &mut hash_recv,
1236                    &mut send,
1237                    &mut change_path_,
1238                    false,
1239                )
1240                .await?;
1241
1242            Ok::<_, anyhow::Error>(self_)
1243        });
1244
1245        let mut change_path_ = repo.changes_dir.clone();
1246        let mut waiting = 0;
1247        let (send_ready, mut recv_ready) = tokio::sync::mpsc::channel(100);
1248
1249        let mut asked = HashSet::new();
1250        for h in to_apply {
1251            debug!("to_apply {:?}", h);
1252            match h {
1253                CS::Change(h) => {
1254                    libpijul::changestore::filesystem::push_filename(&mut change_path_, h);
1255                }
1256                CS::State(h) => {
1257                    libpijul::changestore::filesystem::push_tag_filename(&mut change_path_, h);
1258                }
1259            }
1260            asked.insert(*h);
1261            hash_send.send(*h)?;
1262            waiting += 1;
1263            libpijul::changestore::filesystem::pop_filename(&mut change_path_);
1264        }
1265
1266        let u = self
1267            .download_changes_rec(
1268                repo,
1269                hash_send,
1270                recv,
1271                send_ready,
1272                download_bar,
1273                waiting,
1274                asked,
1275            )
1276            .await?;
1277
1278        let mut ws = libpijul::ApplyWorkspace::new();
1279        let mut to_apply_inodes = HashSet::new();
1280        while let Some(h) = recv_ready.recv().await {
1281            debug!("to_apply: {:?}", h);
1282            let touches_inodes = inodes.is_empty()
1283                || {
1284                    debug!("inodes = {:?}", inodes);
1285                    use libpijul::changestore::ChangeStore;
1286                    if let CS::Change(ref h) = h {
1287                        let changes = repo.changes.get_changes(h)?;
1288                        changes.iter().any(|c| {
1289                            c.iter().any(|c| {
1290                                let inode = c.inode();
1291                                debug!("inode = {:?}", inode);
1292                                if let Some(h) = inode.change {
1293                                    inodes.contains(&Position {
1294                                        change: h,
1295                                        pos: inode.pos,
1296                                    })
1297                                } else {
1298                                    false
1299                                }
1300                            })
1301                        })
1302                    } else {
1303                        false
1304                    }
1305                }
1306                || { inodes.iter().any(|i| CS::Change(i.change) == h) };
1307
1308            if touches_inodes {
1309                to_apply_inodes.insert(h);
1310            } else {
1311                continue;
1312            }
1313
1314            if let Some(apply_bar) = apply_bar.clone() {
1315                info!("Applying {:?}", h);
1316                apply_bar.inc(1);
1317                debug!("apply");
1318                if let CS::Change(h) = h {
1319                    let mut channel = channel.write();
1320                    txn.apply_change_rec_ws(&repo.changes, &mut channel, &h, &mut ws)?;
1321                }
1322                debug!("applied");
1323            } else {
1324                debug!("not applying {:?}", h)
1325            }
1326        }
1327
1328        let mut result = Vec::with_capacity(to_apply_inodes.len());
1329        for h in to_apply {
1330            if to_apply_inodes.contains(&h) {
1331                result.push(*h)
1332            }
1333        }
1334
1335        debug!("finished");
1336        debug!("waiting for spawned process");
1337        *self = t.await??;
1338        u.await??;
1339        Ok(result)
1340    }
1341
1342    async fn download_changes_rec(
1343        &mut self,
1344        repo: &mut Repository,
1345        send_hash: tokio::sync::mpsc::UnboundedSender<CS>,
1346        mut recv_signal: tokio::sync::mpsc::Receiver<(CS, bool)>,
1347        send_ready: tokio::sync::mpsc::Sender<CS>,
1348        progress_bar: ProgressBar,
1349        mut waiting: usize,
1350        mut asked: HashSet<CS>,
1351    ) -> Result<tokio::task::JoinHandle<Result<(), anyhow::Error>>, anyhow::Error> {
1352        let mut change_path = repo.changes_dir.clone();
1353        let mut dep_path = repo.changes_dir.clone();
1354        let changes = repo.changes.clone();
1355        let t = tokio::spawn(async move {
1356            if waiting == 0 {
1357                return Ok(());
1358            }
1359            let mut ready = Vec::new();
1360            while let Some((hash, follow)) = recv_signal.recv().await {
1361                debug!("received {:?} {:?}", hash, follow);
1362                if let CS::Change(hash) = hash {
1363                    waiting -= 1;
1364                    if follow {
1365                        libpijul::changestore::filesystem::push_filename(&mut change_path, &hash);
1366                        std::fs::create_dir_all(change_path.parent().unwrap())?;
1367                        use libpijul::changestore::ChangeStore;
1368                        let mut needs_dep = false;
1369                        for dep in changes.get_dependencies(&hash)? {
1370                            let dep: libpijul::pristine::Hash = dep;
1371
1372                            libpijul::changestore::filesystem::push_filename(&mut dep_path, &dep);
1373                            let has_dep = std::fs::metadata(&dep_path).is_ok();
1374                            libpijul::changestore::filesystem::pop_filename(&mut dep_path);
1375
1376                            if !has_dep {
1377                                needs_dep = true;
1378                                if asked.insert(CS::Change(dep)) {
1379                                    progress_bar.inc(1);
1380                                    send_hash.send(CS::Change(dep))?;
1381                                    waiting += 1
1382                                }
1383                            }
1384                        }
1385                        libpijul::changestore::filesystem::pop_filename(&mut change_path);
1386
1387                        if !needs_dep {
1388                            send_ready.send(CS::Change(hash)).await?;
1389                        } else {
1390                            ready.push(CS::Change(hash))
1391                        }
1392                    } else {
1393                        send_ready.send(CS::Change(hash)).await?;
1394                    }
1395                }
1396                if waiting == 0 {
1397                    break;
1398                }
1399            }
1400            info!("waiting loop done");
1401            for r in ready {
1402                send_ready.send(r).await?;
1403            }
1404            std::mem::drop(recv_signal);
1405            Ok(())
1406        });
1407        Ok(t)
1408    }
1409
1410    pub async fn clone_tag<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
1411        &mut self,
1412        repo: &mut Repository,
1413        txn: &mut T,
1414        channel: &mut ChannelRef<T>,
1415        tag: &[Hash],
1416    ) -> Result<(), anyhow::Error> {
1417        let (send_hash, mut recv_hash) = tokio::sync::mpsc::unbounded_channel();
1418        let (mut send_signal, recv_signal) = tokio::sync::mpsc::channel(100);
1419        let mut self_ = std::mem::replace(self, RemoteRepo::None);
1420        let mut change_path_ = repo.changes_dir.clone();
1421        let download_bar = ProgressBar::new(tag.len() as u64, DOWNLOAD_MESSAGE)?;
1422        let cloned_download_bar = download_bar.clone();
1423
1424        let t = tokio::spawn(async move {
1425            self_
1426                .download_changes(
1427                    cloned_download_bar,
1428                    &mut recv_hash,
1429                    &mut send_signal,
1430                    &mut change_path_,
1431                    false,
1432                )
1433                .await?;
1434            Ok(self_)
1435        });
1436
1437        let mut waiting = 0;
1438        let mut asked = HashSet::new();
1439        for &h in tag.iter() {
1440            waiting += 1;
1441            send_hash.send(CS::Change(h))?;
1442            asked.insert(CS::Change(h));
1443        }
1444
1445        let (send_ready, mut recv_ready) = tokio::sync::mpsc::channel(100);
1446
1447        let u = self
1448            .download_changes_rec(
1449                repo,
1450                send_hash,
1451                recv_signal,
1452                send_ready,
1453                download_bar,
1454                waiting,
1455                asked,
1456            )
1457            .await?;
1458
1459        let mut hashes = Vec::new();
1460        let mut ws = libpijul::ApplyWorkspace::new();
1461        {
1462            let mut channel_ = channel.write();
1463            while let Some(hash) = recv_ready.recv().await {
1464                if let CS::Change(ref hash) = hash {
1465                    txn.apply_change_rec_ws(&repo.changes, &mut channel_, hash, &mut ws)?;
1466                }
1467                hashes.push(hash);
1468            }
1469        }
1470        let r: Result<_, anyhow::Error> = t.await?;
1471        *self = r?;
1472        u.await??;
1473        self.complete_changes(repo, txn, channel, &hashes, false)
1474            .await?;
1475        Ok(())
1476    }
1477
1478    pub async fn clone_state<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
1479        &mut self,
1480        repo: &mut Repository,
1481        txn: &mut T,
1482        channel: &mut ChannelRef<T>,
1483        state: Merkle,
1484    ) -> Result<(), anyhow::Error> {
1485        let id = if let Some(id) = self.get_id(txn).await? {
1486            id
1487        } else {
1488            return Ok(());
1489        };
1490        self.update_changelist(txn, &[]).await?;
1491        let remote = txn.open_or_create_remote(id, self.name().unwrap()).unwrap();
1492        let mut to_pull = Vec::new();
1493        let mut found = false;
1494        for x in txn.iter_remote(&remote.lock().remote, 0)? {
1495            let (n, p) = x?;
1496            debug!("{:?} {:?}", n, p);
1497            to_pull.push(CS::Change(p.a.into()));
1498            if p.b == state {
1499                found = true;
1500                break;
1501            }
1502        }
1503        if !found {
1504            bail!("State not found: {:?}", state)
1505        }
1506        self.pull(repo, txn, channel, &to_pull, &HashSet::new(), true)
1507            .await?;
1508        self.update_identities(repo, &remote).await?;
1509
1510        self.complete_changes(repo, txn, channel, &to_pull, false)
1511            .await?;
1512        Ok(())
1513    }
1514
1515    pub async fn complete_changes<T: MutTxnT + TxnTExt + GraphIter>(
1516        &mut self,
1517        repo: &pijul_repository::Repository,
1518        txn: &T,
1519        local_channel: &mut ChannelRef<T>,
1520        changes: &[CS],
1521        full: bool,
1522    ) -> Result<(), anyhow::Error> {
1523        debug!("complete changes {:?}", changes);
1524        use libpijul::changestore::ChangeStore;
1525        let (send_hash, mut recv_hash) = tokio::sync::mpsc::unbounded_channel();
1526        let (mut send_sig, mut recv_sig) = tokio::sync::mpsc::channel(100);
1527        let mut self_ = std::mem::replace(self, RemoteRepo::None);
1528        let mut changes_dir = repo.changes_dir.clone();
1529
1530        let download_bar = ProgressBar::new(changes.len() as u64, DOWNLOAD_MESSAGE)?;
1531        let _completion_spinner = Spinner::new(COMPLETE_MESSAGE)?;
1532        let t: tokio::task::JoinHandle<Result<RemoteRepo, anyhow::Error>> =
1533            tokio::spawn(async move {
1534                self_
1535                    .download_changes(
1536                        download_bar,
1537                        &mut recv_hash,
1538                        &mut send_sig,
1539                        &mut changes_dir,
1540                        true,
1541                    )
1542                    .await?;
1543                Ok::<_, anyhow::Error>(self_)
1544            });
1545
1546        for c in changes {
1547            let c = if let CS::Change(c) = c { c } else { continue };
1548            let sc = c.into();
1549            if repo
1550                .changes
1551                .has_contents(*c, txn.get_internal(&sc)?.cloned())
1552            {
1553                debug!("has contents {:?}", c);
1554                continue;
1555            }
1556            if full {
1557                debug!("sending send_hash");
1558                send_hash.send(CS::Change(*c))?;
1559                debug!("sent");
1560                continue;
1561            }
1562            let change = if let Some(&i) = txn.get_internal(&sc)? {
1563                i
1564            } else {
1565                debug!("could not find internal for {:?}", sc);
1566                continue;
1567            };
1568            // Check if at least one non-empty vertex from c is still alive.
1569            let v = libpijul::pristine::Vertex {
1570                change,
1571                start: libpijul::pristine::ChangePosition(0u64.into()),
1572                end: libpijul::pristine::ChangePosition(0u64.into()),
1573            };
1574            let channel = local_channel.read();
1575            let graph = txn.graph(&channel);
1576            for x in txn.iter_graph(graph, Some(&v))? {
1577                let (v, e) = x?;
1578                if v.change > change {
1579                    break;
1580                } else if e.flag().is_alive_parent() {
1581                    send_hash.send(CS::Change(*c))?;
1582                    break;
1583                }
1584            }
1585        }
1586        debug!("dropping send_hash");
1587        std::mem::drop(send_hash);
1588        while recv_sig.recv().await.is_some() {}
1589        *self = t.await??;
1590        Ok(())
1591    }
1592
1593    pub async fn clone_channel<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
1594        &mut self,
1595        repo: &mut Repository,
1596        txn: &mut T,
1597        local_channel: &mut ChannelRef<T>,
1598        path: &[String],
1599    ) -> Result<(), anyhow::Error> {
1600        let (inodes, remote_changes) = if let Some(x) = self.update_changelist(txn, path).await? {
1601            x
1602        } else {
1603            bail!("Channel not found")
1604        };
1605        let mut pullable = Vec::new();
1606        {
1607            let rem = remote_changes.lock();
1608            for x in txn.iter_remote(&rem.remote, 0)? {
1609                let (_, p) = x?;
1610                pullable.push(CS::Change(p.a.into()))
1611            }
1612        }
1613        self.pull(repo, txn, local_channel, &pullable, &inodes, true)
1614            .await?;
1615        self.update_identities(repo, &remote_changes).await?;
1616
1617        self.complete_changes(repo, txn, local_channel, &pullable, false)
1618            .await?;
1619        Ok(())
1620    }
1621}
1622
1623use libpijul::pristine::{ChangePosition, Position};
1624use regex::Regex;
1625
1626lazy_static! {
1627    static ref CHANGELIST_LINE: Regex = Regex::new(
1628        r#"(?P<num>[0-9]+)\.(?P<hash>[A-Za-z0-9]+)\.(?P<merkle>[A-Za-z0-9]+)(?P<tag>\.)?"#
1629    )
1630    .unwrap();
1631    static ref PATHS_LINE: Regex =
1632        Regex::new(r#"(?P<hash>[A-Za-z0-9]+)\.(?P<num>[0-9]+)"#).unwrap();
1633}
1634
1635enum ListLine {
1636    Change {
1637        n: u64,
1638        h: Hash,
1639        m: Merkle,
1640        tag: bool,
1641    },
1642    Position(Position<Hash>),
1643    Error(String),
1644}
1645
1646fn parse_line(data: &str) -> Result<ListLine, anyhow::Error> {
1647    debug!("data = {:?}", data);
1648    if let Some(caps) = CHANGELIST_LINE.captures(data) {
1649        if let (Some(h), Some(m)) = (
1650            Hash::from_base32(caps.name("hash").unwrap().as_str().as_bytes()),
1651            Merkle::from_base32(caps.name("merkle").unwrap().as_str().as_bytes()),
1652        ) {
1653            return Ok(ListLine::Change {
1654                n: caps.name("num").unwrap().as_str().parse().unwrap(),
1655                h,
1656                m,
1657                tag: caps.name("tag").is_some(),
1658            });
1659        }
1660    }
1661    if data.starts_with("error:") {
1662        return Ok(ListLine::Error(data.split_at(6).1.to_string()));
1663    }
1664    if let Some(caps) = PATHS_LINE.captures(data) {
1665        return Ok(ListLine::Position(Position {
1666            change: Hash::from_base32(caps.name("hash").unwrap().as_str().as_bytes()).unwrap(),
1667            pos: ChangePosition(
1668                caps.name("num")
1669                    .unwrap()
1670                    .as_str()
1671                    .parse::<u64>()
1672                    .unwrap()
1673                    .into(),
1674            ),
1675        }));
1676    }
1677    debug!("offending line: {:?}", data);
1678    bail!("Protocol error")
1679}
1680
1681/// Compare the remote set (theirs_ge_dichotomy) with our current
1682/// version of that (ours_ge_dichotomy) and return the changes in our
1683/// current version that are not in the remote anymore.
1684fn remote_unrecs<T: TxnTExt + ChannelTxnT>(
1685    txn: &T,
1686    current_channel: &ChannelRef<T>,
1687    ours_ge_dichotomy: &[(u64, CS)],
1688    theirs_ge_dichotomy_set: &HashSet<CS>,
1689) -> Result<Vec<(u64, CS)>, anyhow::Error> {
1690    let mut remote_unrecs = Vec::new();
1691    for (n, hash) in ours_ge_dichotomy {
1692        debug!("ours_ge_dichotomy: {:?} {:?}", n, hash);
1693        if theirs_ge_dichotomy_set.contains(hash) {
1694            // If this change is still present in the remote, skip
1695            debug!("still present");
1696            continue;
1697        } else {
1698            let has_it = match hash {
1699                CS::Change(hash) => txn.get_revchanges(&current_channel, &hash)?.is_some(),
1700                CS::State(state) => {
1701                    let ch = current_channel.read();
1702                    if let Some(n) = txn.channel_has_state(txn.states(&*ch), &state.into())? {
1703                        txn.is_tagged(txn.tags(&*ch), n.into())?
1704                    } else {
1705                        false
1706                    }
1707                }
1708            };
1709            if has_it {
1710                remote_unrecs.push((*n, *hash))
1711            } else {
1712                // If this unrecord wasn't in our current channel, skip
1713                continue;
1714            }
1715        }
1716    }
1717    Ok(remote_unrecs)
1718}