Skip to main content

inflorescence_pijul_remote/
lib.rs

1use std::collections::HashSet;
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::sync::{Arc, LazyLock};
5
6use anyhow::{Context, bail};
7use libpijul::DOT_DIR;
8use libpijul::pristine::{
9    Base32, ChangeId, ChangePosition, ChannelRef, GraphIter, Hash, Merkle, MutTxnT, Position,
10    RemoteRef, TxnT, sanakirja::MutTxn, sanakirja::RawMutTxnT,
11};
12use libpijul::{ChannelTxnT, DepsTxnT, GraphTxnT, MutTxnTExt, TxnTExt};
13use log::{debug, info};
14use regex::Regex;
15
16use pijul_config::remote::{RemoteConfig, RemoteHttpHeader};
17use pijul_identity::Complete;
18use pijul_repository::*;
19
20pub mod ssh;
21use ssh::*;
22
23pub mod local;
24use local::*;
25
26pub mod http;
27use http::*;
28
29use pijul_interaction::{
30    APPLY_MESSAGE, COMPLETE_MESSAGE, DOWNLOAD_MESSAGE, ProgressBar, Spinner, UPLOAD_MESSAGE,
31};
32
33pub const PROTOCOL_VERSION: usize = 3;
34
35pub enum RemoteRepo {
36    Local(Local),
37    Ssh(Ssh),
38    Http(Http),
39    LocalChannel(String),
40    None,
41}
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
44pub enum CS {
45    Change(Hash),
46    State(Merkle),
47}
48
49pub async fn repository(
50    config: &pijul_config::Config,
51    self_path: Option<&Path>,
52    // User name in case it isn't provided in the `name` argument already.
53    user: Option<&str>,
54    name: &str,
55    channel: &str,
56    no_cert_check: bool,
57    with_path: bool,
58) -> Result<RemoteRepo, anyhow::Error> {
59    if let Some(name) = config.remotes.iter().find(|e| e.name() == name) {
60        name.to_remote(config, channel, no_cert_check, with_path)
61            .await
62    } else {
63        unknown_remote(
64            config,
65            self_path,
66            user,
67            name,
68            channel,
69            no_cert_check,
70            with_path,
71        )
72        .await
73    }
74}
75
76/// Associate a generated key with a remote identity. Patches authored
77/// by unproven keys will only display the key as the author.
78pub async fn prove(
79    config: &pijul_config::Config,
80    identity: &Complete,
81    origin: Option<&str>,
82    no_cert_check: bool,
83    use_keyring: bool,
84) -> Result<(), anyhow::Error> {
85    let remote = origin.unwrap_or(&identity.config.author.origin);
86    let mut stderr = std::io::stderr();
87    writeln!(
88        stderr,
89        "Linking identity `{}` with {}@{}",
90        &identity.name, &identity.config.author.username, remote
91    )?;
92
93    let mut remote = repository(
94        config,
95        None,
96        Some(&identity.config.author.username),
97        &remote,
98        libpijul::DEFAULT_CHANNEL,
99        no_cert_check,
100        false,
101    )
102    .await?;
103
104    let (key, _password) =
105        identity
106            .credentials
107            .clone()
108            .unwrap()
109            .decrypt(config, &identity.name, use_keyring)?;
110    remote.prove(key).await?;
111
112    Ok(())
113}
114
115fn shell_cmd(s: &str) -> Result<String, anyhow::Error> {
116    let out = if cfg!(target_os = "windows") {
117        std::process::Command::new("cmd")
118            .args(&["/C", s])
119            .output()
120            .expect("failed to execute process")
121    } else {
122        std::process::Command::new(std::env::var("SHELL").unwrap_or("sh".to_string()))
123            .arg("-c")
124            .arg(s)
125            .output()
126            .expect("failed to execute process")
127    };
128    Ok(String::from_utf8(out.stdout)?.trim().to_string())
129}
130
131#[allow(async_fn_in_trait)]
132pub trait ToRemote {
133    async fn to_remote(
134        &self,
135        config: &pijul_config::Config,
136        channel: &str,
137        no_cert_check: bool,
138        with_path: bool,
139    ) -> Result<RemoteRepo, anyhow::Error>;
140}
141
142impl ToRemote for RemoteConfig {
143    async fn to_remote(
144        &self,
145        config: &pijul_config::Config,
146        channel: &str,
147        no_cert_check: bool,
148        with_path: bool,
149    ) -> Result<RemoteRepo, anyhow::Error> {
150        match self {
151            RemoteConfig::Ssh { ssh, .. } => {
152                if let Some(mut sshr) = ssh_remote(None, ssh, with_path) {
153                    debug!("unknown_remote, ssh = {:?}", ssh);
154                    if let Some(c) = sshr.connect(config, ssh, channel).await? {
155                        return Ok(RemoteRepo::Ssh(c));
156                    }
157                }
158                bail!("Remote not found: {:?}", ssh)
159            }
160            RemoteConfig::Http {
161                http,
162                headers,
163                name,
164            } => {
165                let mut h = Vec::new();
166                for (k, v) in headers.iter() {
167                    match v {
168                        RemoteHttpHeader::String(s) => {
169                            h.push((k.clone(), s.clone()));
170                        }
171                        RemoteHttpHeader::Shell(shell) => {
172                            h.push((k.clone(), shell_cmd(&shell.shell)?));
173                        }
174                    }
175                }
176                return Ok(RemoteRepo::Http(Http {
177                    url: http.parse().unwrap(),
178                    channel: channel.to_string(),
179                    client: reqwest::ClientBuilder::new()
180                        .danger_accept_invalid_certs(no_cert_check)
181                        .build()?,
182                    headers: h,
183                    name: name.to_string(),
184                }));
185            }
186        }
187    }
188}
189
190pub async fn unknown_remote(
191    config: &pijul_config::Config,
192    self_path: Option<&Path>,
193    user: Option<&str>,
194    name: &str,
195    channel: &str,
196    no_cert_check: bool,
197    with_path: bool,
198) -> Result<RemoteRepo, anyhow::Error> {
199    if let Ok(url) = url::Url::parse(name) {
200        let scheme = url.scheme();
201        if scheme == "http" || scheme == "https" {
202            debug!("unknown_remote, http = {:?}", name);
203            return Ok(RemoteRepo::Http(Http {
204                url,
205                channel: channel.to_string(),
206                client: reqwest::ClientBuilder::new()
207                    .danger_accept_invalid_certs(no_cert_check)
208                    .build()?,
209                headers: Vec::new(),
210                name: name.to_string(),
211            }));
212        } else if scheme == "ssh" {
213            if let Some(mut ssh) = ssh_remote(user, name, with_path) {
214                debug!("unknown_remote, ssh = {:?}", ssh);
215                if let Some(c) = ssh.connect(config, name, channel).await? {
216                    return Ok(RemoteRepo::Ssh(c));
217                }
218            }
219            bail!("Remote not found: {:?}", name)
220        } else {
221            bail!("Remote scheme not supported: {:?}", scheme)
222        }
223    }
224    if let Ok(root) = std::fs::canonicalize(name) {
225        if let Some(path) = self_path {
226            let path = std::fs::canonicalize(path)?;
227            if path == root {
228                return Ok(RemoteRepo::LocalChannel(channel.to_string()));
229            }
230        }
231
232        let mut dot_dir = root.join(DOT_DIR);
233        let changes_dir = dot_dir.join(CHANGES_DIR);
234
235        dot_dir.push(PRISTINE_DIR);
236        debug!("dot_dir = {:?}", dot_dir);
237        match libpijul::pristine::sanakirja::Pristine::new(&dot_dir.join("db")) {
238            Ok(pristine) => {
239                debug!("pristine done");
240                return Ok(RemoteRepo::Local(Local {
241                    root: Path::new(name).to_path_buf(),
242                    channel: channel.to_string(),
243                    changes_dir,
244                    pristine: Arc::new(pristine),
245                    name: name.to_string(),
246                }));
247            }
248            Err(libpijul::pristine::sanakirja::SanakirjaError::Sanakirja(
249                sanakirja::Error::IO(e),
250            )) if e.kind() == std::io::ErrorKind::NotFound => {
251                debug!("repo not found")
252            }
253            Err(e) => return Err(e.into()),
254        }
255    }
256    if let Some(mut ssh) = ssh_remote(user, name, with_path) {
257        debug!("unknown_remote, ssh = {:?}", ssh);
258        if let Some(c) = ssh.connect(config, name, channel).await? {
259            return Ok(RemoteRepo::Ssh(c));
260        }
261    }
262    bail!("Remote not found: {:?}", name)
263}
264
265// Extracting this saves a little bit of duplication.
266pub fn get_local_inodes<T: RawMutTxnT + 'static>(
267    txn: &mut MutTxn<T>,
268    channel: &ChannelRef<MutTxn<T>>,
269    repo: &Repository,
270    path: &[String],
271) -> Result<HashSet<Position<ChangeId>>, anyhow::Error> {
272    let mut paths = HashSet::new();
273    for path in path.iter() {
274        let (p, ambiguous) = txn.follow_oldest_path(&repo.changes, &channel, path)?;
275        if ambiguous {
276            bail!("Ambiguous path: {:?}", path)
277        }
278        paths.insert(p);
279        paths.extend(
280            libpijul::fs::iter_graph_descendants(txn, &channel.read(), p)?.map(|x| x.unwrap()),
281        );
282    }
283    Ok(paths)
284}
285
286/// Embellished [`RemoteDelta`] that has information specific
287/// to a push operation. We want to know what our options are
288/// for changes to upload, whether the remote has unrecorded relevant changes,
289/// and whether the remote has changes we don't know about, since those might
290/// effect whether or not we actually want to go through with the push.
291pub struct PushDelta {
292    pub to_upload: Vec<CS>,
293    pub remote_unrecs: Vec<(u64, CS)>,
294    pub unknown_changes: Vec<CS>,
295}
296
297/// For a [`RemoteRepo`] that's Local, Ssh, or Http
298/// (anything other than a LocalChannel),
299/// [`RemoteDelta`] contains data about the difference between
300/// the "actual" state of the remote ('theirs') and the last version of it
301/// that we cached ('ours'). The dichotomy is the last point at which the two
302/// were the same. `remote_unrecs` is a list of changes which used to be
303/// present in the remote, AND were present in the current channel we're
304/// pulling to or pushing from. The significance of that is that if we knew
305/// about a certain change but did not pull it, the user won't be notified
306/// if it's unrecorded in the remote.
307///
308/// If the remote we're pulling from or pushing to is a LocalChannel,
309/// (meaning it's just a different channel of the repo we're already in), then
310/// `ours_ge_dichotomy`, `theirs_ge_dichotomy`, and `remote_unrecs` will be empty
311/// since they have no meaning. If we're pulling from a LocalChannel,
312/// there's no cache to have diverged from, and if we're pushing to a LocalChannel,
313/// we're not going to suddenly be surprised by the presence of unknown changes.
314///
315/// This struct will be created by both a push and pull operation since both
316/// need to update the changelist and will at least try to update the local
317/// remote cache. For a push, this later gets turned into [`PushDelta`].
318pub struct RemoteDelta<T: MutTxnTExt + TxnTExt> {
319    pub inodes: HashSet<Position<Hash>>,
320    pub to_download: Vec<CS>,
321    pub remote_ref: Option<RemoteRef<T>>,
322    pub ours_ge_dichotomy_set: HashSet<CS>,
323    pub theirs_ge_dichotomy_set: HashSet<CS>,
324    // Keep the Vec representation around as well so that notification
325    // for unknown changes during shows the hashes in order.
326    pub theirs_ge_dichotomy: Vec<(u64, Hash, Merkle, bool)>,
327    pub remote_unrecs: Vec<(u64, CS)>,
328}
329
330impl<T: RawMutTxnT + 'static> RemoteDelta<MutTxn<T>> {
331    /// Make a [`PushDelta`] from a [`RemoteDelta`]
332    /// when the remote is a [`RemoteRepo::LocalChannel`].
333    pub fn to_local_channel_push(
334        self,
335        remote_channel: &str,
336        txn: &mut MutTxn<T>,
337        path: &[String],
338        channel: &ChannelRef<MutTxn<T>>,
339        repo: &Repository,
340    ) -> Result<PushDelta, anyhow::Error> {
341        let mut to_upload = Vec::new();
342        let inodes = get_local_inodes(txn, channel, repo, path)?;
343
344        for x in txn.reverse_log(&*channel.read(), None)? {
345            let (_, (h, _)) = x?;
346            if let Some(channel) = txn.load_channel(remote_channel)? {
347                let channel = channel.read();
348                let h_int = txn.get_internal(h)?.unwrap();
349                if txn.get_changeset(txn.changes(&channel), h_int)?.is_none() {
350                    if inodes.is_empty() {
351                        to_upload.push(CS::Change(h.into()))
352                    } else {
353                        for p in inodes.iter() {
354                            if txn.get_touched_files(p, Some(h_int))?.is_some() {
355                                to_upload.push(CS::Change(h.into()));
356                                break;
357                            }
358                        }
359                    }
360                }
361            }
362        }
363        assert!(self.ours_ge_dichotomy_set.is_empty());
364        assert!(self.theirs_ge_dichotomy_set.is_empty());
365        let d = PushDelta {
366            to_upload: to_upload.into_iter().rev().collect(),
367            remote_unrecs: self.remote_unrecs,
368            unknown_changes: Vec::new(),
369        };
370        assert!(d.remote_unrecs.is_empty());
371        Ok(d)
372    }
373
374    /// Make a [`PushDelta`] from a [`RemoteDelta`] when the remote
375    /// is not a LocalChannel.
376    pub fn to_remote_push(
377        self,
378        txn: &mut MutTxn<T>,
379        path: &[String],
380        channel: &ChannelRef<MutTxn<T>>,
381        repo: &Repository,
382    ) -> Result<PushDelta, anyhow::Error> {
383        let mut to_upload = Vec::new();
384        let inodes = get_local_inodes(txn, channel, repo, path)?;
385        if let Some(ref remote_ref) = self.remote_ref {
386            let mut tags: HashSet<Merkle> = HashSet::new();
387            for x in txn.rev_iter_tags(&channel.read().tags, None)? {
388                let (n, m) = x?;
389                debug!("rev_iter_tags {:?} {:?}", n, m);
390                // First, if the remote has exactly the same first n tags, break.
391                if let Some((_, p)) = txn.get_remote_tag(&remote_ref.lock().tags, (*n).into())? {
392                    if p.b == m.b {
393                        debug!("the remote has tag {:?}", p.a);
394                        break;
395                    }
396                    if p.a != m.a {
397                        // What to do here?  It is possible that state
398                        // `n` is a different state than `m.a` in the
399                        // remote, and is also tagged.
400                    }
401                } else {
402                    tags.insert(m.a.into());
403                }
404            }
405            debug!("tags = {:?}", tags);
406            for x in txn.reverse_log(&*channel.read(), None)? {
407                let (_, (h, m)) = x?;
408                let h_unrecorded = self
409                    .remote_unrecs
410                    .iter()
411                    .any(|(_, hh)| hh == &CS::Change(h.into()));
412                if !h_unrecorded {
413                    if txn.remote_has_state(remote_ref, &m)?.is_some() {
414                        debug!("remote_has_state: {:?}", m);
415                        break;
416                    }
417                }
418                let h_int = txn.get_internal(h)?.unwrap();
419                let h_deser = Hash::from(h);
420                // For elements that are in the uncached remote changes (theirs_ge_dichotomy),
421                // don't put those in to_upload since the remote we're pushing to
422                // already has those changes.
423                if (!txn.remote_has_change(remote_ref, &h)? || h_unrecorded)
424                    && !self.theirs_ge_dichotomy_set.contains(&CS::Change(h_deser))
425                {
426                    if inodes.is_empty() {
427                        if tags.remove(&m.into()) {
428                            to_upload.push(CS::State(m.into()));
429                        }
430                        to_upload.push(CS::Change(h_deser));
431                    } else {
432                        for p in inodes.iter() {
433                            if txn.get_touched_files(p, Some(h_int))?.is_some() {
434                                to_upload.push(CS::Change(h_deser));
435                                if tags.remove(&m.into()) {
436                                    to_upload.push(CS::State(m.into()));
437                                }
438                                break;
439                            }
440                        }
441                    }
442                }
443            }
444            for t in tags.iter() {
445                if let Some(n) = txn.remote_has_state(&remote_ref, &t.into())? {
446                    if !txn.is_tagged(&remote_ref.lock().tags, n)? {
447                        to_upload.push(CS::State(*t));
448                    }
449                } else {
450                    debug!("the remote doesn't have state {:?}", t);
451                }
452            }
453        }
454
455        // { h | h \in theirs_ge_dichotomy /\ ~(h \in ours_ge_dichotomy) }
456        // The set of their changes >= dichotomy that aren't
457        // already known to our set of changes after the dichotomy.
458        let mut unknown_changes = Vec::new();
459        for (_, h, m, is_tag) in self.theirs_ge_dichotomy.iter() {
460            let h_is_known = txn.get_revchanges(&channel, h).unwrap().is_some();
461            let change = CS::Change(*h);
462            if !(self.ours_ge_dichotomy_set.contains(&change) || h_is_known) {
463                unknown_changes.push(change)
464            }
465            if *is_tag {
466                let m_is_known = if let Some(n) = txn
467                    .channel_has_state(txn.states(&*channel.read()), &m.into())
468                    .unwrap()
469                {
470                    txn.is_tagged(txn.tags(&*channel.read()), n.into()).unwrap()
471                } else {
472                    false
473                };
474                if !m_is_known {
475                    unknown_changes.push(CS::State(*m))
476                }
477            }
478        }
479
480        Ok(PushDelta {
481            to_upload: to_upload.into_iter().rev().collect(),
482            remote_unrecs: self.remote_unrecs,
483            unknown_changes,
484        })
485    }
486}
487
488/// Create a [`RemoteDelta`] for a [`RemoteRepo::LocalChannel`].
489/// Since this case doesn't have a local remote cache to worry about,
490/// mainly just calculates the `to_download` list of changes.
491pub fn update_changelist_local_channel<T: RawMutTxnT + 'static>(
492    remote_channel: &str,
493    txn: &mut MutTxn<T>,
494    path: &[String],
495    current_channel: &ChannelRef<MutTxn<T>>,
496    repo: &Repository,
497    specific_changes: &[String],
498) -> Result<RemoteDelta<MutTxn<T>>, anyhow::Error> {
499    if !specific_changes.is_empty() {
500        let mut to_download = Vec::new();
501        for h in specific_changes {
502            let h = txn.hash_from_prefix(h)?.0;
503            if txn.get_revchanges(current_channel, &h)?.is_none() {
504                to_download.push(CS::Change(h));
505            }
506        }
507        Ok(RemoteDelta {
508            inodes: HashSet::new(),
509            to_download,
510            remote_ref: None,
511            ours_ge_dichotomy_set: HashSet::new(),
512            theirs_ge_dichotomy: Vec::new(),
513            theirs_ge_dichotomy_set: HashSet::new(),
514            remote_unrecs: Vec::new(),
515        })
516    } else {
517        let mut inodes = HashSet::new();
518        let inodes_ = get_local_inodes(txn, current_channel, repo, path)?;
519        let mut to_download = Vec::new();
520        inodes.extend(inodes_.iter().map(|x| libpijul::pristine::Position {
521            change: txn.get_external(&x.change).unwrap().unwrap().into(),
522            pos: x.pos,
523        }));
524        if let Some(remote_channel) = txn.load_channel(remote_channel)? {
525            let remote_channel = remote_channel.read();
526            for x in txn.reverse_log(&remote_channel, None)? {
527                let (_, (h, m)) = x?;
528                if txn
529                    .channel_has_state(txn.states(&*current_channel.read()), &m)?
530                    .is_some()
531                {
532                    break;
533                }
534                let h_int = txn.get_internal(h)?.unwrap();
535                if txn
536                    .get_changeset(txn.changes(&*current_channel.read()), h_int)?
537                    .is_none()
538                {
539                    if inodes_.is_empty()
540                        || inodes_.iter().any(|&inode| {
541                            txn.get_rev_touched_files(h_int, Some(&inode))
542                                .unwrap()
543                                .is_some()
544                        })
545                    {
546                        to_download.push(CS::Change(h.into()));
547                    }
548                }
549            }
550        }
551        Ok(RemoteDelta {
552            inodes,
553            to_download,
554            remote_ref: None,
555            ours_ge_dichotomy_set: HashSet::new(),
556            theirs_ge_dichotomy: Vec::new(),
557            theirs_ge_dichotomy_set: HashSet::new(),
558            remote_unrecs: Vec::new(),
559        })
560    }
561}
562
563impl RemoteRepo {
564    fn name(&self) -> Option<&str> {
565        match *self {
566            RemoteRepo::Ssh(ref s) => Some(s.name.as_str()),
567            RemoteRepo::Local(ref l) => Some(l.name.as_str()),
568            RemoteRepo::Http(ref h) => Some(h.name.as_str()),
569            RemoteRepo::LocalChannel(_) => None,
570            RemoteRepo::None => unreachable!(),
571        }
572    }
573
574    pub fn repo_name(&self) -> Result<Option<String>, anyhow::Error> {
575        match *self {
576            RemoteRepo::Ssh(ref s) => {
577                if let Some(sep) = s.name.rfind(|c| c == ':' || c == '/') {
578                    Ok(Some(s.name.split_at(sep + 1).1.to_string()))
579                } else {
580                    Ok(Some(s.name.as_str().to_string()))
581                }
582            }
583            RemoteRepo::Local(ref l) => {
584                if let Some(file) = l.root.file_name() {
585                    Ok(Some(
586                        file.to_str()
587                            .context("failed to decode local repository name")?
588                            .to_string(),
589                    ))
590                } else {
591                    Ok(None)
592                }
593            }
594            RemoteRepo::Http(ref h) => {
595                if let Some(name) = libpijul::path::file_name(h.url.path()) {
596                    if !name.trim().is_empty() {
597                        return Ok(Some(name.trim().to_string()));
598                    }
599                }
600                Ok(h.url.host().map(|h| h.to_string()))
601            }
602            RemoteRepo::LocalChannel(_) => Ok(None),
603            RemoteRepo::None => unreachable!(),
604        }
605    }
606
607    pub async fn finish(&mut self) -> Result<(), anyhow::Error> {
608        if let RemoteRepo::Ssh(s) = self {
609            s.finish().await?
610        }
611        Ok(())
612    }
613
614    pub async fn update_changelist<T: MutTxnTExt + TxnTExt + 'static>(
615        &mut self,
616        txn: &mut T,
617        path: &[String],
618    ) -> Result<Option<(HashSet<Position<Hash>>, RemoteRef<T>)>, anyhow::Error> {
619        debug!("update_changelist");
620        let id = if let Some(id) = self.get_id(txn).await? {
621            id
622        } else {
623            return Ok(None);
624        };
625        let mut remote = if let Some(name) = self.name() {
626            txn.open_or_create_remote(id, name)?
627        } else {
628            return Ok(None);
629        };
630        let n = self.dichotomy_changelist(txn, &remote.lock()).await?;
631        debug!("update changelist {:?}", n);
632        let v: Vec<_> = txn
633            .iter_remote(&remote.lock().remote, n)?
634            .filter_map(|k| {
635                debug!("filter_map {:?}", k);
636                let k = (*k.unwrap().0).into();
637                if k >= n { Some(k) } else { None }
638            })
639            .collect();
640        for k in v {
641            debug!("deleting {:?}", k);
642            txn.del_remote(&mut remote, k)?;
643        }
644        let v: Vec<_> = txn
645            .iter_tags(&remote.lock().tags, n)?
646            .filter_map(|k| {
647                debug!("filter_map {:?}", k);
648                let k = (*k.unwrap().0).into();
649                if k >= n { Some(k) } else { None }
650            })
651            .collect();
652        for k in v {
653            debug!("deleting {:?}", k);
654            txn.del_tags(&mut remote.lock().tags, k)?;
655        }
656
657        debug!("deleted");
658        let paths = self.download_changelist(txn, &mut remote, n, path).await?;
659        Ok(Some((paths, remote)))
660    }
661
662    async fn update_changelist_pushpull_from_scratch<T: RawMutTxnT>(
663        &mut self,
664        txn: &mut MutTxn<T>,
665        path: &[String],
666        current_channel: &ChannelRef<MutTxn<T>>,
667    ) -> Result<RemoteDelta<MutTxn<T>>, anyhow::Error> {
668        debug!("no id, starting from scratch");
669        let (inodes, theirs_ge_dichotomy) = self.download_changelist_nocache(0, path).await?;
670        let mut theirs_ge_dichotomy_set = HashSet::new();
671        let mut to_download = Vec::new();
672        for (_, h, m, is_tag) in theirs_ge_dichotomy.iter() {
673            theirs_ge_dichotomy_set.insert(CS::Change(*h));
674            if txn.get_revchanges(current_channel, h)?.is_none() {
675                to_download.push(CS::Change(*h));
676            }
677            if *is_tag {
678                let ch = current_channel.read();
679                if let Some(n) = txn.channel_has_state(txn.states(&*ch), &m.into())? {
680                    if !txn.is_tagged(txn.tags(&*ch), n.into())? {
681                        to_download.push(CS::State(*m));
682                    }
683                } else {
684                    to_download.push(CS::State(*m));
685                }
686            }
687        }
688        Ok(RemoteDelta {
689            inodes,
690            remote_ref: None,
691            to_download,
692            ours_ge_dichotomy_set: HashSet::new(),
693            theirs_ge_dichotomy,
694            theirs_ge_dichotomy_set,
695            remote_unrecs: Vec::new(),
696        })
697    }
698
699    /// Creates a [`RemoteDelta`].
700    ///
701    /// IF:
702    ///    the RemoteRepo is a [`RemoteRepo::LocalChannel`], delegate to
703    ///    the simpler method [`update_changelist_local_channel`], returning the
704    ///    `to_download` list of changes.
705    ///
706    /// ELSE:
707    ///    calculate the `to_download` list of changes. Additionally, if there are
708    ///    no remote unrecords, update the local remote cache. If there are remote unrecords,
709    ///    calculate and return information about the difference between our cached version
710    ///    of the remote, and their version of the remote.
711    pub async fn update_changelist_pushpull<T: RawMutTxnT + 'static>(
712        &mut self,
713        txn: &mut MutTxn<T>,
714        path: &[String],
715        current_channel: &ChannelRef<MutTxn<T>>,
716        force_cache: Option<bool>,
717        repo: &Repository,
718        specific_changes: &[String],
719        is_pull: bool,
720    ) -> Result<RemoteDelta<MutTxn<T>>, anyhow::Error> {
721        debug!("update_changelist_pushpull");
722        if let RemoteRepo::LocalChannel(c) = self {
723            return update_changelist_local_channel(
724                c,
725                txn,
726                path,
727                current_channel,
728                repo,
729                specific_changes,
730            );
731        }
732
733        let id = if let Some(id) = self.get_id(txn).await? {
734            debug!("id = {:?}", id);
735            id
736        } else {
737            return self
738                .update_changelist_pushpull_from_scratch(txn, path, current_channel)
739                .await;
740        };
741        let mut remote_ref = txn.open_or_create_remote(id, self.name().unwrap()).unwrap();
742        let dichotomy_n = self.dichotomy_changelist(txn, &remote_ref.lock()).await?;
743        let ours_ge_dichotomy: Vec<(u64, CS)> = txn
744            .iter_remote(&remote_ref.lock().remote, dichotomy_n)?
745            .filter_map(|k| {
746                debug!("filter_map {:?}", k);
747                match k.unwrap() {
748                    (k, libpijul::pristine::Pair { a: hash, .. }) => {
749                        let (k, hash) = (u64::from(*k), Hash::from(*hash));
750                        if k >= dichotomy_n {
751                            Some((k, CS::Change(hash)))
752                        } else {
753                            None
754                        }
755                    }
756                }
757            })
758            .collect();
759        let (inodes, theirs_ge_dichotomy) =
760            self.download_changelist_nocache(dichotomy_n, path).await?;
761        debug!("theirs_ge_dichotomy = {:?}", theirs_ge_dichotomy);
762        let ours_ge_dichotomy_set = ours_ge_dichotomy
763            .iter()
764            .map(|(_, h)| h)
765            .copied()
766            .collect::<HashSet<CS>>();
767        let mut theirs_ge_dichotomy_set = HashSet::new();
768        for (_, h, m, is_tag) in theirs_ge_dichotomy.iter() {
769            theirs_ge_dichotomy_set.insert(CS::Change(*h));
770            if *is_tag {
771                theirs_ge_dichotomy_set.insert(CS::State(*m));
772            }
773        }
774
775        // remote_unrecs = {x: (u64, Hash) | x \in ours_ge_dichot /\ ~(x \in theirs_ge_dichot) /\ x \in current_channel }
776        let remote_unrecs = remote_unrecs(
777            txn,
778            current_channel,
779            &ours_ge_dichotomy,
780            &theirs_ge_dichotomy_set,
781        )?;
782        let should_cache = if let Some(true) = force_cache {
783            true
784        } else {
785            remote_unrecs.is_empty()
786        };
787        debug!(
788            "should_cache = {:?} {:?} {:?}",
789            force_cache, remote_unrecs, should_cache
790        );
791        if should_cache {
792            use libpijul::ChannelMutTxnT;
793            for (k, t) in ours_ge_dichotomy.iter().copied() {
794                match t {
795                    CS::State(_) => txn.del_tags(&mut remote_ref.lock().tags, k)?,
796                    CS::Change(_) => {
797                        txn.del_remote(&mut remote_ref, k)?;
798                    }
799                }
800            }
801            for (n, h, m, is_tag) in theirs_ge_dichotomy.iter().copied() {
802                debug!("theirs: {:?} {:?} {:?}", n, h, m);
803                txn.put_remote(&mut remote_ref, n, (h, m))?;
804                if is_tag {
805                    txn.put_tags(&mut remote_ref.lock().tags, n, &m)?;
806                }
807            }
808        }
809        if !specific_changes.is_empty() {
810            // Here, the user only wanted to push/pull specific changes
811            let to_download = specific_changes
812                .iter()
813                .map(|h| {
814                    if is_pull {
815                        {
816                            if let Ok(t) = txn.state_from_prefix(&remote_ref.lock().states, h) {
817                                return Ok(CS::State(t.0));
818                            }
819                        }
820                        Ok(CS::Change(txn.hash_from_prefix_remote(&remote_ref, h)?))
821                    } else {
822                        if let Ok(t) = txn.state_from_prefix(&current_channel.read().states, h) {
823                            Ok(CS::State(t.0))
824                        } else {
825                            Ok(CS::Change(txn.hash_from_prefix(h)?.0))
826                        }
827                    }
828                })
829                .collect::<Result<Vec<_>, anyhow::Error>>();
830            Ok(RemoteDelta {
831                inodes,
832                remote_ref: Some(remote_ref),
833                to_download: to_download?,
834                ours_ge_dichotomy_set,
835                theirs_ge_dichotomy,
836                theirs_ge_dichotomy_set,
837                remote_unrecs,
838            })
839        } else {
840            let mut to_download: Vec<CS> = Vec::new();
841            let mut to_download_ = HashSet::new();
842            for x in txn.iter_rev_remote(&remote_ref.lock().remote, None)? {
843                let (_, p) = x?;
844                let h: Hash = p.a.into();
845                if txn
846                    .channel_has_state(txn.states(&current_channel.read()), &p.b)
847                    .unwrap()
848                    .is_some()
849                {
850                    break;
851                }
852                if txn.get_revchanges(&current_channel, &h).unwrap().is_none() {
853                    let h = CS::Change(h);
854                    if to_download_.insert(h.clone()) {
855                        to_download.push(h);
856                    }
857                }
858            }
859
860            // The patches in theirs_ge_dichotomy are unknown to us,
861            // download them.
862            for (n, h, m, is_tag) in theirs_ge_dichotomy.iter() {
863                debug!(
864                    "update_changelist_pushpull line {}, {:?} {:?}",
865                    line!(),
866                    n,
867                    h
868                );
869                // In all cases, add this new change/state/tag to `to_download`.
870                let ch = CS::Change(*h);
871                if txn.get_revchanges(&current_channel, h).unwrap().is_none() {
872                    if to_download_.insert(ch.clone()) {
873                        to_download.push(ch.clone());
874                    }
875                    if *is_tag {
876                        to_download.push(CS::State(*m));
877                    }
878                } else if *is_tag {
879                    let has_tag = if let Some(n) =
880                        txn.channel_has_state(txn.states(&current_channel.read()), &m.into())?
881                    {
882                        txn.is_tagged(txn.tags(&current_channel.read()), n.into())?
883                    } else {
884                        false
885                    };
886                    if !has_tag {
887                        to_download.push(CS::State(*m));
888                    }
889                }
890                // Additionally, if there are no remote unrecords
891                // (i.e. if `should_cache`), cache.
892                if should_cache && ours_ge_dichotomy_set.get(&ch).is_none() {
893                    use libpijul::ChannelMutTxnT;
894                    txn.put_remote(&mut remote_ref, *n, (*h, *m))?;
895                    if *is_tag {
896                        let mut rem = remote_ref.lock();
897                        txn.put_tags(&mut rem.tags, *n, m)?;
898                    }
899                }
900            }
901            Ok(RemoteDelta {
902                inodes,
903                remote_ref: Some(remote_ref),
904                to_download,
905                ours_ge_dichotomy_set,
906                theirs_ge_dichotomy,
907                theirs_ge_dichotomy_set,
908                remote_unrecs,
909            })
910        }
911    }
912
913    /// Get the list of the remote's changes that come after `from: u64`.
914    /// Instead of immediately updating the local cache of the remote, return
915    /// the change info without changing the cache.
916    pub async fn download_changelist_nocache(
917        &mut self,
918        from: u64,
919        paths: &[String],
920    ) -> Result<(HashSet<Position<Hash>>, Vec<(u64, Hash, Merkle, bool)>), anyhow::Error> {
921        let mut v = Vec::new();
922        let f = |v: &mut Vec<(u64, Hash, Merkle, bool)>, n, h, m, m2| {
923            debug!("no cache: {:?}", h);
924            Ok(v.push((n, h, m, m2)))
925        };
926        let r = match *self {
927            RemoteRepo::Local(ref mut l) => l.download_changelist(f, &mut v, from, paths)?,
928            RemoteRepo::Ssh(ref mut s) => s.download_changelist(f, &mut v, from, paths).await?,
929            RemoteRepo::Http(ref h) => h.download_changelist(f, &mut v, from, paths).await?,
930            RemoteRepo::LocalChannel(_) => HashSet::new(),
931            RemoteRepo::None => unreachable!(),
932        };
933        Ok((r, v))
934    }
935
936    /// Uses a binary search to find the integer identifier of the last point
937    /// at which our locally cached version of the remote was the same as the 'actual'
938    /// state of the remote.
939    async fn dichotomy_changelist<T: MutTxnT + TxnTExt>(
940        &mut self,
941        txn: &T,
942        remote: &libpijul::pristine::Remote<T>,
943    ) -> Result<u64, anyhow::Error> {
944        let mut a = 0;
945        let (mut b, state): (_, Merkle) = if let Some((u, v)) = txn.last_remote(&remote.remote)? {
946            debug!("dichotomy_changelist: {:?} {:?}", u, v);
947            (u, (&v.b).into())
948        } else {
949            debug!("the local copy of the remote has no changes");
950            return Ok(0);
951        };
952        let last_statet = if let Some((_, _, v)) = txn.last_remote_tag(&remote.tags)? {
953            v.into()
954        } else {
955            Merkle::zero()
956        };
957        debug!("last_state: {:?} {:?}", state, last_statet);
958        if let Some((_, s, st)) = self.get_state(txn, Some(b)).await? {
959            debug!("remote last_state: {:?} {:?}", s, st);
960            if s == state && st == last_statet {
961                // The local list is already up to date.
962                return Ok(b + 1);
963            }
964        }
965        // Else, find the last state we have in common with the
966        // remote, it might be older than the last known state (if
967        // changes were unrecorded on the remote).
968        while a < b {
969            let mid = (a + b) / 2;
970            let (mid, state) = {
971                let (a, b) = txn.get_remote_state(&remote.remote, mid)?.unwrap();
972                (a, b.b)
973            };
974            let statet = if let Some((_, b)) = txn.get_remote_tag(&remote.tags, mid)? {
975                // There's still a tag at position >= mid in the
976                // sequence.
977                b.b.into()
978            } else {
979                // No tag at or after mid, the last state, `statet`,
980                // is the right answer in that case.
981                last_statet
982            };
983
984            let remote_state = self.get_state(txn, Some(mid)).await?;
985            debug!("dichotomy {:?} {:?} {:?}", mid, state, remote_state);
986            if let Some((_, remote_state, remote_statet)) = remote_state {
987                if remote_state == state && remote_statet == statet {
988                    if a == mid {
989                        return Ok(a + 1);
990                    } else {
991                        a = mid;
992                        continue;
993                    }
994                }
995            }
996            if b == mid {
997                break;
998            } else {
999                b = mid
1000            }
1001        }
1002        Ok(a)
1003    }
1004
1005    async fn get_state<T: libpijul::TxnTExt>(
1006        &mut self,
1007        txn: &T,
1008        mid: Option<u64>,
1009    ) -> Result<Option<(u64, Merkle, Merkle)>, anyhow::Error> {
1010        match *self {
1011            RemoteRepo::Local(ref mut l) => l.get_state(mid),
1012            RemoteRepo::Ssh(ref mut s) => s.get_state(mid).await,
1013            RemoteRepo::Http(ref mut h) => h.get_state(mid).await,
1014            RemoteRepo::LocalChannel(ref channel) => {
1015                if let Some(channel) = txn.load_channel(&channel)? {
1016                    local::get_state(txn, &channel, mid)
1017                } else {
1018                    Ok(None)
1019                }
1020            }
1021            RemoteRepo::None => unreachable!(),
1022        }
1023    }
1024
1025    /// This method might return `Ok(None)` in some cases, for example
1026    /// if the remote wants to indicate not to store a cache. This is
1027    /// the case for Nest channels, for example.
1028    async fn get_id<T: libpijul::TxnTExt + 'static>(
1029        &mut self,
1030        txn: &T,
1031    ) -> Result<Option<libpijul::pristine::RemoteId>, anyhow::Error> {
1032        match *self {
1033            RemoteRepo::Local(ref l) => Ok(Some(l.get_id()?)),
1034            RemoteRepo::Ssh(ref mut s) => s.get_id().await,
1035            RemoteRepo::Http(ref h) => h.get_id().await,
1036            RemoteRepo::LocalChannel(ref channel) => {
1037                if let Some(channel) = txn.load_channel(&channel)? {
1038                    Ok(txn.id(&*channel.read()).cloned())
1039                } else {
1040                    Err(anyhow::anyhow!(
1041                        "Unable to retrieve RemoteId for LocalChannel remote"
1042                    ))
1043                }
1044            }
1045            RemoteRepo::None => unreachable!(),
1046        }
1047    }
1048
1049    pub async fn archive<W: std::io::Write + Send + 'static>(
1050        &mut self,
1051        prefix: Option<String>,
1052        state: Option<(Merkle, &[Hash])>,
1053        umask: u16,
1054        w: W,
1055    ) -> Result<u64, anyhow::Error> {
1056        match *self {
1057            RemoteRepo::Local(ref mut l) => {
1058                debug!("archiving local repo");
1059                let changes = libpijul::changestore::filesystem::FileSystem::from_root(
1060                    &l.root,
1061                    pijul_repository::max_files(),
1062                );
1063                let working_copy =
1064                    libpijul::working_copy::filesystem::FileSystem::from_root(&l.root);
1065                let mut tarball = libpijul::output::Tarball::new(w, prefix, umask);
1066                let conflicts = if let Some((state, extra)) = state {
1067                    let txn = l.pristine.arc_txn_begin()?;
1068                    let channel = {
1069                        let txn = txn.read();
1070                        txn.load_channel(&l.channel)?.unwrap()
1071                    };
1072                    txn.archive_with_state(
1073                        &changes,
1074                        &channel,
1075                        &state,
1076                        extra,
1077                        &mut tarball,
1078                        0,
1079                        &working_copy,
1080                    )?
1081                } else {
1082                    let txn = l.pristine.arc_txn_begin()?;
1083                    let channel = {
1084                        let txn = txn.read();
1085                        txn.load_channel(&l.channel)?.unwrap()
1086                    };
1087                    txn.archive::<_, _, libpijul::working_copy::filesystem::FileSystem>(
1088                        &changes,
1089                        &channel,
1090                        &mut tarball,
1091                    )?
1092                };
1093                Ok(conflicts.len() as u64)
1094            }
1095            RemoteRepo::Ssh(ref mut s) => s.archive(prefix, state, w).await,
1096            RemoteRepo::Http(ref mut h) => h.archive(prefix, state, w).await,
1097            RemoteRepo::LocalChannel(_) => unreachable!(),
1098            RemoteRepo::None => unreachable!(),
1099        }
1100    }
1101
1102    async fn download_changelist<T: MutTxnTExt>(
1103        &mut self,
1104        txn: &mut T,
1105        remote: &mut RemoteRef<T>,
1106        from: u64,
1107        paths: &[String],
1108    ) -> Result<HashSet<Position<Hash>>, anyhow::Error> {
1109        let f = |a: &mut (&mut T, &mut RemoteRef<T>), n, h, m, is_tag| {
1110            let (ref mut txn, ref mut remote) = *a;
1111            txn.put_remote(remote, n, (h, m))?;
1112            if is_tag {
1113                txn.put_tags(&mut remote.lock().tags, n, &m.into())?;
1114            }
1115            Ok(())
1116        };
1117        match *self {
1118            RemoteRepo::Local(ref mut l) => {
1119                l.download_changelist(f, &mut (txn, remote), from, paths)
1120            }
1121            RemoteRepo::Ssh(ref mut s) => {
1122                s.download_changelist(f, &mut (txn, remote), from, paths)
1123                    .await
1124            }
1125            RemoteRepo::Http(ref h) => {
1126                h.download_changelist(f, &mut (txn, remote), from, paths)
1127                    .await
1128            }
1129            RemoteRepo::LocalChannel(_) => Ok(HashSet::new()),
1130            RemoteRepo::None => unreachable!(),
1131        }
1132    }
1133
1134    pub async fn upload_changes<T: MutTxnTExt + 'static>(
1135        &mut self,
1136        txn: &mut T,
1137        local: PathBuf,
1138        to_channel: Option<&str>,
1139        changes: &[CS],
1140    ) -> Result<(), anyhow::Error> {
1141        let upload_bar = ProgressBar::new(changes.len() as u64, UPLOAD_MESSAGE)?;
1142
1143        match self {
1144            RemoteRepo::Local(l) => l.upload_changes(upload_bar, local, to_channel, changes)?,
1145            RemoteRepo::Ssh(s) => {
1146                s.upload_changes(upload_bar, local, to_channel, changes)
1147                    .await?
1148            }
1149            &mut RemoteRepo::Http(ref h) => {
1150                h.upload_changes(upload_bar, local, to_channel, changes)
1151                    .await?
1152            }
1153            &mut RemoteRepo::LocalChannel(ref channel) => {
1154                let mut channel = txn.open_or_create_channel(channel)?;
1155                let store = libpijul::changestore::filesystem::FileSystem::from_changes(
1156                    local,
1157                    pijul_repository::max_files(),
1158                );
1159                local::upload_changes(upload_bar, &store, txn, &mut channel, changes)?
1160            }
1161            RemoteRepo::None => unreachable!(),
1162        }
1163        Ok(())
1164    }
1165
1166    /// Start (and possibly complete) the download of a change.
1167    pub async fn download_changes(
1168        &mut self,
1169        progress_bar: ProgressBar,
1170        hashes: &mut tokio::sync::mpsc::UnboundedReceiver<CS>,
1171        send: &mut tokio::sync::mpsc::Sender<(CS, bool)>,
1172        path: &mut PathBuf,
1173        full: bool,
1174    ) -> Result<bool, anyhow::Error> {
1175        debug!("download_changes");
1176        match *self {
1177            RemoteRepo::Local(ref mut l) => {
1178                l.download_changes(progress_bar, hashes, send, path).await?
1179            }
1180            RemoteRepo::Ssh(ref mut s) => {
1181                s.download_changes(progress_bar, hashes, send, path, full)
1182                    .await?
1183            }
1184            RemoteRepo::Http(ref mut h) => {
1185                h.download_changes(progress_bar, hashes, send, path, full)
1186                    .await?
1187            }
1188            RemoteRepo::LocalChannel(_) => {
1189                while let Some(c) = hashes.recv().await {
1190                    send.send((c, true)).await?;
1191                }
1192            }
1193            RemoteRepo::None => unreachable!(),
1194        }
1195        Ok(true)
1196    }
1197
1198    pub async fn update_identities<T: MutTxnTExt + TxnTExt + GraphIter>(
1199        &mut self,
1200        repo: &mut Repository,
1201        remote: &RemoteRef<T>,
1202    ) -> Result<(), anyhow::Error> {
1203        debug!("Downloading identities");
1204        let mut id_path = repo.path.clone();
1205        id_path.push(DOT_DIR);
1206        id_path.push("identities");
1207        let rev = None;
1208        let r = match *self {
1209            RemoteRepo::Local(ref mut l) => l.update_identities(rev, id_path).await?,
1210            RemoteRepo::Ssh(ref mut s) => s.update_identities(rev, id_path).await?,
1211            RemoteRepo::Http(ref mut h) => h.update_identities(rev, id_path).await?,
1212            RemoteRepo::LocalChannel(_) => 0,
1213            RemoteRepo::None => unreachable!(),
1214        };
1215        remote.set_id_revision(r);
1216        Ok(())
1217    }
1218
1219    pub async fn prove(&mut self, key: libpijul::key::SKey) -> Result<(), anyhow::Error> {
1220        match *self {
1221            RemoteRepo::Ssh(ref mut s) => s.prove(key).await,
1222            RemoteRepo::Http(ref mut h) => h.prove(key).await,
1223            RemoteRepo::None => unreachable!(),
1224            _ => Ok(()),
1225        }
1226    }
1227
1228    pub async fn pull<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
1229        &mut self,
1230        repo: &mut Repository,
1231        txn: &mut T,
1232        channel: &mut ChannelRef<T>,
1233        to_apply: &[CS],
1234        inodes: &HashSet<Position<Hash>>,
1235        do_apply: bool,
1236    ) -> Result<Vec<CS>, anyhow::Error> {
1237        let apply_len = to_apply.len() as u64;
1238        let download_bar = ProgressBar::new(apply_len, DOWNLOAD_MESSAGE)?;
1239        let apply_bar = if do_apply {
1240            Some(ProgressBar::new(apply_len, APPLY_MESSAGE)?)
1241        } else {
1242            None
1243        };
1244
1245        let (mut send, recv) = tokio::sync::mpsc::channel(100);
1246
1247        let mut self_ = std::mem::replace(self, RemoteRepo::None);
1248        let (hash_send, mut hash_recv) = tokio::sync::mpsc::unbounded_channel();
1249        let mut change_path_ = repo.path.clone();
1250        change_path_.push(DOT_DIR);
1251        change_path_.push("changes");
1252        let cloned_download_bar = download_bar.clone();
1253        let t = tokio::spawn(async move {
1254            self_
1255                .download_changes(
1256                    cloned_download_bar,
1257                    &mut hash_recv,
1258                    &mut send,
1259                    &mut change_path_,
1260                    false,
1261                )
1262                .await?;
1263
1264            Ok::<_, anyhow::Error>(self_)
1265        });
1266
1267        let mut change_path_ = repo.changes_dir.clone();
1268        let mut waiting = 0;
1269        let (send_ready, mut recv_ready) = tokio::sync::mpsc::channel(100);
1270
1271        let mut asked = HashSet::new();
1272        for h in to_apply {
1273            debug!("to_apply {:?}", h);
1274            match h {
1275                CS::Change(h) => {
1276                    libpijul::changestore::filesystem::push_filename(&mut change_path_, h);
1277                }
1278                CS::State(h) => {
1279                    libpijul::changestore::filesystem::push_tag_filename(&mut change_path_, h);
1280                }
1281            }
1282            asked.insert(*h);
1283            hash_send.send(*h)?;
1284            waiting += 1;
1285            libpijul::changestore::filesystem::pop_filename(&mut change_path_);
1286        }
1287
1288        let u = self
1289            .download_changes_rec(
1290                repo,
1291                hash_send,
1292                recv,
1293                send_ready,
1294                download_bar,
1295                waiting,
1296                asked,
1297            )
1298            .await?;
1299
1300        let mut ws = libpijul::ApplyWorkspace::new();
1301        let mut to_apply_inodes = HashSet::new();
1302        while let Some(h) = recv_ready.recv().await {
1303            debug!("to_apply: {:?}", h);
1304            let touches_inodes = inodes.is_empty()
1305                || {
1306                    debug!("inodes = {:?}", inodes);
1307                    use libpijul::changestore::ChangeStore;
1308                    if let CS::Change(ref h) = h {
1309                        let changes = repo.changes.get_changes(h)?;
1310                        changes.iter().any(|c| {
1311                            c.iter().any(|c| {
1312                                let inode = c.inode();
1313                                debug!("inode = {:?}", inode);
1314                                inodes.contains(&Position {
1315                                    change: inode.change.unwrap_or(*h),
1316                                    pos: inode.pos,
1317                                })
1318                            })
1319                        })
1320                    } else {
1321                        false
1322                    }
1323                }
1324                || { inodes.iter().any(|i| CS::Change(i.change) == h) };
1325
1326            if touches_inodes {
1327                to_apply_inodes.insert(h);
1328            } else {
1329                continue;
1330            }
1331
1332            if let Some(apply_bar) = apply_bar.clone() {
1333                info!("Applying {:?}", h);
1334                apply_bar.inc(1);
1335                debug!("apply");
1336                if let CS::Change(h) = h {
1337                    let mut channel = channel.write();
1338                    txn.apply_change_rec_ws(&repo.changes, &mut channel, &h, &mut ws)?;
1339                }
1340                debug!("applied");
1341            } else {
1342                debug!("not applying {:?}", h)
1343            }
1344        }
1345
1346        let mut result = Vec::with_capacity(to_apply_inodes.len());
1347        for h in to_apply {
1348            if to_apply_inodes.contains(&h) {
1349                result.push(*h)
1350            }
1351        }
1352
1353        debug!("finished");
1354        debug!("waiting for spawned process");
1355        *self = t.await??;
1356        u.await??;
1357        Ok(result)
1358    }
1359
1360    async fn download_changes_rec(
1361        &mut self,
1362        repo: &mut Repository,
1363        send_hash: tokio::sync::mpsc::UnboundedSender<CS>,
1364        mut recv_signal: tokio::sync::mpsc::Receiver<(CS, bool)>,
1365        send_ready: tokio::sync::mpsc::Sender<CS>,
1366        progress_bar: ProgressBar,
1367        mut waiting: usize,
1368        mut asked: HashSet<CS>,
1369    ) -> Result<tokio::task::JoinHandle<Result<(), anyhow::Error>>, anyhow::Error> {
1370        let mut dep_path = repo.changes_dir.clone();
1371        let changes = repo.changes.clone();
1372        let t = tokio::spawn(async move {
1373            if waiting == 0 {
1374                return Ok(());
1375            }
1376            let mut ready = Vec::new();
1377            while let Some((hash, follow)) = recv_signal.recv().await {
1378                debug!("received {:?} {:?}", hash, follow);
1379                if let CS::Change(hash) = hash {
1380                    waiting -= 1;
1381                    if follow {
1382                        use libpijul::changestore::ChangeStore;
1383                        let mut needs_dep = false;
1384                        for dep in changes.get_dependencies(&hash)? {
1385                            let dep: libpijul::pristine::Hash = dep;
1386
1387                            libpijul::changestore::filesystem::push_filename(&mut dep_path, &dep);
1388                            let has_dep = std::fs::metadata(&dep_path).is_ok();
1389                            libpijul::changestore::filesystem::pop_filename(&mut dep_path);
1390
1391                            if !has_dep {
1392                                needs_dep = true;
1393                                if asked.insert(CS::Change(dep)) {
1394                                    progress_bar.inc(1);
1395                                    send_hash.send(CS::Change(dep))?;
1396                                    waiting += 1
1397                                }
1398                            }
1399                        }
1400
1401                        if !needs_dep {
1402                            send_ready.send(CS::Change(hash)).await?;
1403                        } else {
1404                            ready.push(CS::Change(hash))
1405                        }
1406                    } else {
1407                        send_ready.send(CS::Change(hash)).await?;
1408                    }
1409                }
1410                if waiting == 0 {
1411                    break;
1412                }
1413            }
1414            info!("waiting loop done");
1415            for r in ready {
1416                send_ready.send(r).await?;
1417            }
1418            std::mem::drop(recv_signal);
1419            Ok(())
1420        });
1421        Ok(t)
1422    }
1423
1424    pub async fn clone_tag<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
1425        &mut self,
1426        repo: &mut Repository,
1427        txn: &mut T,
1428        channel: &mut ChannelRef<T>,
1429        tag: &[Hash],
1430    ) -> Result<(), anyhow::Error> {
1431        let (send_hash, mut recv_hash) = tokio::sync::mpsc::unbounded_channel();
1432        let (mut send_signal, recv_signal) = tokio::sync::mpsc::channel(100);
1433        let mut self_ = std::mem::replace(self, RemoteRepo::None);
1434        let mut change_path_ = repo.changes_dir.clone();
1435        let download_bar = ProgressBar::new(tag.len() as u64, DOWNLOAD_MESSAGE)?;
1436        let cloned_download_bar = download_bar.clone();
1437
1438        let t = tokio::spawn(async move {
1439            self_
1440                .download_changes(
1441                    cloned_download_bar,
1442                    &mut recv_hash,
1443                    &mut send_signal,
1444                    &mut change_path_,
1445                    false,
1446                )
1447                .await?;
1448            Ok(self_)
1449        });
1450
1451        let mut waiting = 0;
1452        let mut asked = HashSet::new();
1453        for &h in tag.iter() {
1454            waiting += 1;
1455            send_hash.send(CS::Change(h))?;
1456            asked.insert(CS::Change(h));
1457        }
1458
1459        let (send_ready, mut recv_ready) = tokio::sync::mpsc::channel(100);
1460
1461        let u = self
1462            .download_changes_rec(
1463                repo,
1464                send_hash,
1465                recv_signal,
1466                send_ready,
1467                download_bar,
1468                waiting,
1469                asked,
1470            )
1471            .await?;
1472
1473        let mut hashes = Vec::new();
1474        let mut ws = libpijul::ApplyWorkspace::new();
1475        {
1476            let mut channel_ = channel.write();
1477            while let Some(hash) = recv_ready.recv().await {
1478                if let CS::Change(ref hash) = hash {
1479                    txn.apply_change_rec_ws(&repo.changes, &mut channel_, hash, &mut ws)?;
1480                }
1481                hashes.push(hash);
1482            }
1483        }
1484        let r: Result<_, anyhow::Error> = t.await?;
1485        *self = r?;
1486        u.await??;
1487        self.complete_changes(repo, txn, channel, &hashes, false)
1488            .await?;
1489        Ok(())
1490    }
1491
1492    pub async fn clone_state<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
1493        &mut self,
1494        repo: &mut Repository,
1495        txn: &mut T,
1496        channel: &mut ChannelRef<T>,
1497        state: Merkle,
1498    ) -> Result<(), anyhow::Error> {
1499        let id = if let Some(id) = self.get_id(txn).await? {
1500            id
1501        } else {
1502            return Ok(());
1503        };
1504        self.update_changelist(txn, &[]).await?;
1505        let remote = txn.open_or_create_remote(id, self.name().unwrap()).unwrap();
1506        let mut to_pull = Vec::new();
1507        let mut found = false;
1508        for x in txn.iter_remote(&remote.lock().remote, 0)? {
1509            let (n, p) = x?;
1510            debug!("{:?} {:?}", n, p);
1511            to_pull.push(CS::Change(p.a.into()));
1512            if p.b == state {
1513                found = true;
1514                break;
1515            }
1516        }
1517        if !found {
1518            bail!("State not found: {:?}", state)
1519        }
1520        self.pull(repo, txn, channel, &to_pull, &HashSet::new(), true)
1521            .await?;
1522        self.update_identities(repo, &remote).await?;
1523
1524        self.complete_changes(repo, txn, channel, &to_pull, false)
1525            .await?;
1526        Ok(())
1527    }
1528
1529    pub async fn complete_changes<T: MutTxnT + TxnTExt + GraphIter>(
1530        &mut self,
1531        repo: &pijul_repository::Repository,
1532        txn: &T,
1533        local_channel: &mut ChannelRef<T>,
1534        changes: &[CS],
1535        full: bool,
1536    ) -> Result<(), anyhow::Error> {
1537        debug!("complete changes {:?}", changes);
1538        use libpijul::changestore::ChangeStore;
1539        let (send_hash, mut recv_hash) = tokio::sync::mpsc::unbounded_channel();
1540        let (mut send_sig, mut recv_sig) = tokio::sync::mpsc::channel(100);
1541        let mut self_ = std::mem::replace(self, RemoteRepo::None);
1542        let mut changes_dir = repo.changes_dir.clone();
1543
1544        let download_bar = ProgressBar::new(changes.len() as u64, DOWNLOAD_MESSAGE)?;
1545        let _completion_spinner = Spinner::new(COMPLETE_MESSAGE)?;
1546        let t: tokio::task::JoinHandle<Result<RemoteRepo, anyhow::Error>> =
1547            tokio::spawn(async move {
1548                self_
1549                    .download_changes(
1550                        download_bar,
1551                        &mut recv_hash,
1552                        &mut send_sig,
1553                        &mut changes_dir,
1554                        true,
1555                    )
1556                    .await?;
1557                Ok::<_, anyhow::Error>(self_)
1558            });
1559
1560        for c in changes {
1561            let c = if let CS::Change(c) = c { c } else { continue };
1562            let sc = c.into();
1563            if repo
1564                .changes
1565                .has_contents(*c, txn.get_internal(&sc)?.cloned())
1566            {
1567                debug!("has contents {:?}", c);
1568                continue;
1569            }
1570            if full {
1571                debug!("sending send_hash");
1572                send_hash.send(CS::Change(*c))?;
1573                debug!("sent");
1574                continue;
1575            }
1576            let change = if let Some(&i) = txn.get_internal(&sc)? {
1577                i
1578            } else {
1579                debug!("could not find internal for {:?}", sc);
1580                continue;
1581            };
1582            // Check if at least one non-empty vertex from c is still alive.
1583            let v = libpijul::pristine::Vertex {
1584                change,
1585                start: libpijul::pristine::ChangePosition(0u64.into()),
1586                end: libpijul::pristine::ChangePosition(0u64.into()),
1587            };
1588            let channel = local_channel.read();
1589            let graph = txn.graph(&channel);
1590            for x in txn.iter_graph(graph, Some(&v))? {
1591                let (v, e) = x?;
1592                if v.change > change {
1593                    break;
1594                } else if e.flag().is_alive_parent() {
1595                    send_hash.send(CS::Change(*c))?;
1596                    break;
1597                }
1598            }
1599        }
1600        debug!("dropping send_hash");
1601        std::mem::drop(send_hash);
1602        while recv_sig.recv().await.is_some() {}
1603        *self = t.await??;
1604        Ok(())
1605    }
1606
1607    pub async fn clone_channel<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
1608        &mut self,
1609        repo: &mut Repository,
1610        txn: &mut T,
1611        local_channel: &mut ChannelRef<T>,
1612        path: &[String],
1613    ) -> Result<(), anyhow::Error> {
1614        let (inodes, remote_changes) = if let Some(x) = self.update_changelist(txn, path).await? {
1615            x
1616        } else {
1617            bail!("Channel not found")
1618        };
1619        let mut pullable = Vec::new();
1620        {
1621            let rem = remote_changes.lock();
1622            for x in txn.iter_remote(&rem.remote, 0)? {
1623                let (_, p) = x?;
1624                pullable.push(CS::Change(p.a.into()))
1625            }
1626        }
1627        self.pull(repo, txn, local_channel, &pullable, &inodes, true)
1628            .await?;
1629        self.update_identities(repo, &remote_changes).await?;
1630
1631        self.complete_changes(repo, txn, local_channel, &pullable, false)
1632            .await?;
1633        Ok(())
1634    }
1635}
1636
1637static CHANGELIST_LINE: LazyLock<Regex> = LazyLock::new(|| {
1638    Regex::new(r#"(?P<num>[0-9]+)\.(?P<hash>[A-Za-z0-9]+)\.(?P<merkle>[A-Za-z0-9]+)(?P<tag>\.)?"#)
1639        .unwrap()
1640});
1641static PATHS_LINE: LazyLock<Regex> =
1642    LazyLock::new(|| Regex::new(r#"(?P<hash>[A-Za-z0-9]+)\.(?P<num>[0-9]+)"#).unwrap());
1643
1644enum ListLine {
1645    Change {
1646        n: u64,
1647        h: Hash,
1648        m: Merkle,
1649        tag: bool,
1650    },
1651    Position(Position<Hash>),
1652    Error(String),
1653}
1654
1655fn parse_line(data: &str) -> Result<ListLine, anyhow::Error> {
1656    debug!("data = {:?}", data);
1657    if let Some(caps) = CHANGELIST_LINE.captures(data) {
1658        if let (Some(h), Some(m)) = (
1659            Hash::from_base32(caps.name("hash").unwrap().as_str().as_bytes()),
1660            Merkle::from_base32(caps.name("merkle").unwrap().as_str().as_bytes()),
1661        ) {
1662            return Ok(ListLine::Change {
1663                n: caps.name("num").unwrap().as_str().parse().unwrap(),
1664                h,
1665                m,
1666                tag: caps.name("tag").is_some(),
1667            });
1668        }
1669    }
1670    if data.starts_with("error:") {
1671        return Ok(ListLine::Error(data.split_at(6).1.to_string()));
1672    }
1673    if let Some(caps) = PATHS_LINE.captures(data) {
1674        return Ok(ListLine::Position(Position {
1675            change: Hash::from_base32(caps.name("hash").unwrap().as_str().as_bytes()).unwrap(),
1676            pos: ChangePosition(
1677                caps.name("num")
1678                    .unwrap()
1679                    .as_str()
1680                    .parse::<u64>()
1681                    .unwrap()
1682                    .into(),
1683            ),
1684        }));
1685    }
1686    debug!("offending line: {:?}", data);
1687    bail!("Protocol error")
1688}
1689
1690/// Compare the remote set (theirs_ge_dichotomy) with our current
1691/// version of that (ours_ge_dichotomy) and return the changes in our
1692/// current version that are not in the remote anymore.
1693fn remote_unrecs<T: TxnTExt + ChannelTxnT>(
1694    txn: &T,
1695    current_channel: &ChannelRef<T>,
1696    ours_ge_dichotomy: &[(u64, CS)],
1697    theirs_ge_dichotomy_set: &HashSet<CS>,
1698) -> Result<Vec<(u64, CS)>, anyhow::Error> {
1699    let mut remote_unrecs = Vec::new();
1700    for (n, hash) in ours_ge_dichotomy {
1701        debug!("ours_ge_dichotomy: {:?} {:?}", n, hash);
1702        if theirs_ge_dichotomy_set.contains(hash) {
1703            // If this change is still present in the remote, skip
1704            debug!("still present");
1705            continue;
1706        } else {
1707            let has_it = match hash {
1708                CS::Change(hash) => txn.get_revchanges(&current_channel, &hash)?.is_some(),
1709                CS::State(state) => {
1710                    let ch = current_channel.read();
1711                    if let Some(n) = txn.channel_has_state(txn.states(&*ch), &state.into())? {
1712                        txn.is_tagged(txn.tags(&*ch), n.into())?
1713                    } else {
1714                        false
1715                    }
1716                }
1717            };
1718            if has_it {
1719                remote_unrecs.push((*n, *hash))
1720            } else {
1721                // If this unrecord wasn't in our current channel, skip
1722                continue;
1723            }
1724        }
1725    }
1726    Ok(remote_unrecs)
1727}