1use std::collections::HashSet;
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use anyhow::{bail, Context};
7use async_trait::async_trait;
8use lazy_static::lazy_static;
9use libpijul::pristine::{
10 sanakirja::MutTxn, Base32, ChangeId, ChannelRef, GraphIter, Hash, Merkle, MutTxnT, RemoteRef,
11 TxnT,
12};
13use libpijul::DOT_DIR;
14use libpijul::{ChannelTxnT, DepsTxnT, GraphTxnT, MutTxnTExt, TxnTExt};
15use log::{debug, info};
16
17use pijul_config::*;
18use pijul_identity::Complete;
19use pijul_repository::*;
20
21pub mod ssh;
22use ssh::*;
23
24pub mod local;
25use local::*;
26
27pub mod http;
28use http::*;
29
30use pijul_interaction::{
31 ProgressBar, Spinner, APPLY_MESSAGE, COMPLETE_MESSAGE, DOWNLOAD_MESSAGE, UPLOAD_MESSAGE,
32};
33
34pub const PROTOCOL_VERSION: usize = 3;
35
36pub enum RemoteRepo {
37 Local(Local),
38 Ssh(Ssh),
39 Http(Http),
40 LocalChannel(String),
41 None,
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
45pub enum CS {
46 Change(Hash),
47 State(Merkle),
48}
49
50pub async fn repository(
51 repo: &Repository,
52 self_path: Option<&Path>,
53 user: Option<&str>,
55 name: &str,
56 channel: &str,
57 no_cert_check: bool,
58 with_path: bool,
59) -> Result<RemoteRepo, anyhow::Error> {
60 if let Some(name) = repo.config.remotes.iter().find(|e| e.name() == name) {
61 name.to_remote(channel, no_cert_check, with_path).await
62 } else {
63 unknown_remote(self_path, user, name, channel, no_cert_check, with_path).await
64 }
65}
66
67pub async fn prove(
70 identity: &Complete,
71 origin: Option<&str>,
72 no_cert_check: bool,
73) -> Result<(), anyhow::Error> {
74 let remote = origin.unwrap_or(&identity.config.author.origin);
75 let mut stderr = std::io::stderr();
76 writeln!(
77 stderr,
78 "Linking identity `{}` with {}@{}",
79 &identity.name, &identity.config.author.username, remote
80 )?;
81
82 let mut remote = if let Ok(repo) = Repository::find_root(None) {
83 repository(
84 &repo,
85 None,
86 Some(&identity.config.author.username),
87 &remote,
88 libpijul::DEFAULT_CHANNEL,
89 no_cert_check,
90 false,
91 )
92 .await?
93 } else {
94 unknown_remote(
95 None,
96 Some(&identity.config.author.username),
97 &remote,
98 libpijul::DEFAULT_CHANNEL,
99 no_cert_check,
100 false,
101 )
102 .await?
103 };
104
105 let (key, _password) = identity
106 .credentials
107 .clone()
108 .unwrap()
109 .decrypt(&identity.name)?;
110 remote.prove(key).await?;
111
112 Ok(())
113}
114
115#[async_trait]
116pub trait ToRemote {
117 async fn to_remote(
118 &self,
119 channel: &str,
120 no_cert_check: bool,
121 with_path: bool,
122 ) -> Result<RemoteRepo, anyhow::Error>;
123}
124
125#[async_trait]
126impl ToRemote for RemoteConfig {
127 async fn to_remote(
128 &self,
129 channel: &str,
130 no_cert_check: bool,
131 with_path: bool,
132 ) -> Result<RemoteRepo, anyhow::Error> {
133 match self {
134 RemoteConfig::Ssh { ssh, .. } => {
135 if let Some(mut sshr) = ssh_remote(None, ssh, with_path) {
136 debug!("unknown_remote, ssh = {:?}", ssh);
137 if let Some(c) = sshr.connect(ssh, channel).await? {
138 return Ok(RemoteRepo::Ssh(c));
139 }
140 }
141 bail!("Remote not found: {:?}", ssh)
142 }
143 RemoteConfig::Http {
144 http,
145 headers,
146 name,
147 } => {
148 let mut h = Vec::new();
149 for (k, v) in headers.iter() {
150 match v {
151 RemoteHttpHeader::String(s) => {
152 h.push((k.clone(), s.clone()));
153 }
154 RemoteHttpHeader::Shell(shell) => {
155 h.push((k.clone(), shell_cmd(&shell.shell)?));
156 }
157 }
158 }
159 return Ok(RemoteRepo::Http(Http {
160 url: http.parse().unwrap(),
161 channel: channel.to_string(),
162 client: reqwest::ClientBuilder::new()
163 .danger_accept_invalid_certs(no_cert_check)
164 .build()?,
165 headers: h,
166 name: name.to_string(),
167 }));
168 }
169 }
170 }
171}
172
173pub async fn unknown_remote(
174 self_path: Option<&Path>,
175 user: Option<&str>,
176 name: &str,
177 channel: &str,
178 no_cert_check: bool,
179 with_path: bool,
180) -> Result<RemoteRepo, anyhow::Error> {
181 if let Ok(url) = url::Url::parse(name) {
182 let scheme = url.scheme();
183 if scheme == "http" || scheme == "https" {
184 debug!("unknown_remote, http = {:?}", name);
185 return Ok(RemoteRepo::Http(Http {
186 url,
187 channel: channel.to_string(),
188 client: reqwest::ClientBuilder::new()
189 .danger_accept_invalid_certs(no_cert_check)
190 .build()?,
191 headers: Vec::new(),
192 name: name.to_string(),
193 }));
194 } else if scheme == "ssh" {
195 if let Some(mut ssh) = ssh_remote(user, name, with_path) {
196 debug!("unknown_remote, ssh = {:?}", ssh);
197 if let Some(c) = ssh.connect(name, channel).await? {
198 return Ok(RemoteRepo::Ssh(c));
199 }
200 }
201 bail!("Remote not found: {:?}", name)
202 } else {
203 bail!("Remote scheme not supported: {:?}", scheme)
204 }
205 }
206 if let Ok(root) = std::fs::canonicalize(name) {
207 if let Some(path) = self_path {
208 let path = std::fs::canonicalize(path)?;
209 if path == root {
210 return Ok(RemoteRepo::LocalChannel(channel.to_string()));
211 }
212 }
213
214 let mut dot_dir = root.join(DOT_DIR);
215 let changes_dir = dot_dir.join(CHANGES_DIR);
216
217 dot_dir.push(PRISTINE_DIR);
218 debug!("dot_dir = {:?}", dot_dir);
219 match libpijul::pristine::sanakirja::Pristine::new(&dot_dir.join("db")) {
220 Ok(pristine) => {
221 debug!("pristine done");
222 return Ok(RemoteRepo::Local(Local {
223 root: Path::new(name).to_path_buf(),
224 channel: channel.to_string(),
225 changes_dir,
226 pristine: Arc::new(pristine),
227 name: name.to_string(),
228 }));
229 }
230 Err(libpijul::pristine::sanakirja::SanakirjaError::Sanakirja(
231 sanakirja::Error::IO(e),
232 )) if e.kind() == std::io::ErrorKind::NotFound => {
233 debug!("repo not found")
234 }
235 Err(e) => return Err(e.into()),
236 }
237 }
238 if let Some(mut ssh) = ssh_remote(user, name, with_path) {
239 debug!("unknown_remote, ssh = {:?}", ssh);
240 if let Some(c) = ssh.connect(name, channel).await? {
241 return Ok(RemoteRepo::Ssh(c));
242 }
243 }
244 bail!("Remote not found: {:?}", name)
245}
246
247pub fn get_local_inodes(
249 txn: &mut MutTxn<()>,
250 channel: &ChannelRef<MutTxn<()>>,
251 repo: &Repository,
252 path: &[String],
253) -> Result<HashSet<Position<ChangeId>>, anyhow::Error> {
254 let mut paths = HashSet::new();
255 for path in path.iter() {
256 let (p, ambiguous) = txn.follow_oldest_path(&repo.changes, &channel, path)?;
257 if ambiguous {
258 bail!("Ambiguous path: {:?}", path)
259 }
260 paths.insert(p);
261 paths.extend(
262 libpijul::fs::iter_graph_descendants(txn, &channel.read(), p)?.map(|x| x.unwrap()),
263 );
264 }
265 Ok(paths)
266}
267
268pub struct PushDelta {
274 pub to_upload: Vec<CS>,
275 pub remote_unrecs: Vec<(u64, CS)>,
276 pub unknown_changes: Vec<CS>,
277}
278
279pub struct RemoteDelta<T: MutTxnTExt + TxnTExt> {
301 pub inodes: HashSet<Position<Hash>>,
302 pub to_download: Vec<CS>,
303 pub remote_ref: Option<RemoteRef<T>>,
304 pub ours_ge_dichotomy_set: HashSet<CS>,
305 pub theirs_ge_dichotomy_set: HashSet<CS>,
306 pub theirs_ge_dichotomy: Vec<(u64, Hash, Merkle, bool)>,
309 pub remote_unrecs: Vec<(u64, CS)>,
310}
311
312impl RemoteDelta<MutTxn<()>> {
313 pub fn to_local_channel_push(
316 self,
317 remote_channel: &str,
318 txn: &mut MutTxn<()>,
319 path: &[String],
320 channel: &ChannelRef<MutTxn<()>>,
321 repo: &Repository,
322 ) -> Result<PushDelta, anyhow::Error> {
323 let mut to_upload = Vec::new();
324 let inodes = get_local_inodes(txn, channel, repo, path)?;
325
326 for x in txn.reverse_log(&*channel.read(), None)? {
327 let (_, (h, _)) = x?;
328 if let Some(channel) = txn.load_channel(remote_channel)? {
329 let channel = channel.read();
330 let h_int = txn.get_internal(h)?.unwrap();
331 if txn.get_changeset(txn.changes(&channel), h_int)?.is_none() {
332 if inodes.is_empty() {
333 to_upload.push(CS::Change(h.into()))
334 } else {
335 for p in inodes.iter() {
336 if txn.get_touched_files(p, Some(h_int))?.is_some() {
337 to_upload.push(CS::Change(h.into()));
338 break;
339 }
340 }
341 }
342 }
343 }
344 }
345 assert!(self.ours_ge_dichotomy_set.is_empty());
346 assert!(self.theirs_ge_dichotomy_set.is_empty());
347 let d = PushDelta {
348 to_upload: to_upload.into_iter().rev().collect(),
349 remote_unrecs: self.remote_unrecs,
350 unknown_changes: Vec::new(),
351 };
352 assert!(d.remote_unrecs.is_empty());
353 Ok(d)
354 }
355
356 pub fn to_remote_push(
359 self,
360 txn: &mut MutTxn<()>,
361 path: &[String],
362 channel: &ChannelRef<MutTxn<()>>,
363 repo: &Repository,
364 ) -> Result<PushDelta, anyhow::Error> {
365 let mut to_upload = Vec::new();
366 let inodes = get_local_inodes(txn, channel, repo, path)?;
367 if let Some(ref remote_ref) = self.remote_ref {
368 let mut tags: HashSet<Merkle> = HashSet::new();
369 for x in txn.rev_iter_tags(&channel.read().tags, None)? {
370 let (n, m) = x?;
371 debug!("rev_iter_tags {:?} {:?}", n, m);
372 if let Some((_, p)) = txn.get_remote_tag(&remote_ref.lock().tags, (*n).into())? {
374 if p.b == m.b {
375 debug!("the remote has tag {:?}", p.a);
376 break;
377 }
378 if p.a != m.a {
379 }
383 } else {
384 tags.insert(m.a.into());
385 }
386 }
387 debug!("tags = {:?}", tags);
388 for x in txn.reverse_log(&*channel.read(), None)? {
389 let (_, (h, m)) = x?;
390 let h_unrecorded = self
391 .remote_unrecs
392 .iter()
393 .any(|(_, hh)| hh == &CS::Change(h.into()));
394 if !h_unrecorded {
395 if txn.remote_has_state(remote_ref, &m)?.is_some() {
396 debug!("remote_has_state: {:?}", m);
397 break;
398 }
399 }
400 let h_int = txn.get_internal(h)?.unwrap();
401 let h_deser = Hash::from(h);
402 if (!txn.remote_has_change(remote_ref, &h)? || h_unrecorded)
406 && !self.theirs_ge_dichotomy_set.contains(&CS::Change(h_deser))
407 {
408 if inodes.is_empty() {
409 if tags.remove(&m.into()) {
410 to_upload.push(CS::State(m.into()));
411 }
412 to_upload.push(CS::Change(h_deser));
413 } else {
414 for p in inodes.iter() {
415 if txn.get_touched_files(p, Some(h_int))?.is_some() {
416 to_upload.push(CS::Change(h_deser));
417 if tags.remove(&m.into()) {
418 to_upload.push(CS::State(m.into()));
419 }
420 break;
421 }
422 }
423 }
424 }
425 }
426 for t in tags.iter() {
427 if let Some(n) = txn.remote_has_state(&remote_ref, &t.into())? {
428 if !txn.is_tagged(&remote_ref.lock().tags, n)? {
429 to_upload.push(CS::State(*t));
430 }
431 } else {
432 debug!("the remote doesn't have state {:?}", t);
433 }
434 }
435 }
436
437 let mut unknown_changes = Vec::new();
441 for (_, h, m, is_tag) in self.theirs_ge_dichotomy.iter() {
442 let h_is_known = txn.get_revchanges(&channel, h).unwrap().is_some();
443 let change = CS::Change(*h);
444 if !(self.ours_ge_dichotomy_set.contains(&change) || h_is_known) {
445 unknown_changes.push(change)
446 }
447 if *is_tag {
448 let m_is_known = if let Some(n) = txn
449 .channel_has_state(txn.states(&*channel.read()), &m.into())
450 .unwrap()
451 {
452 txn.is_tagged(txn.tags(&*channel.read()), n.into()).unwrap()
453 } else {
454 false
455 };
456 if !m_is_known {
457 unknown_changes.push(CS::State(*m))
458 }
459 }
460 }
461
462 Ok(PushDelta {
463 to_upload: to_upload.into_iter().rev().collect(),
464 remote_unrecs: self.remote_unrecs,
465 unknown_changes,
466 })
467 }
468}
469
470pub fn update_changelist_local_channel(
474 remote_channel: &str,
475 txn: &mut MutTxn<()>,
476 path: &[String],
477 current_channel: &ChannelRef<MutTxn<()>>,
478 repo: &Repository,
479 specific_changes: &[String],
480) -> Result<RemoteDelta<MutTxn<()>>, anyhow::Error> {
481 if !specific_changes.is_empty() {
482 let mut to_download = Vec::new();
483 for h in specific_changes {
484 let h = txn.hash_from_prefix(h)?.0;
485 if txn.get_revchanges(current_channel, &h)?.is_none() {
486 to_download.push(CS::Change(h));
487 }
488 }
489 Ok(RemoteDelta {
490 inodes: HashSet::new(),
491 to_download,
492 remote_ref: None,
493 ours_ge_dichotomy_set: HashSet::new(),
494 theirs_ge_dichotomy: Vec::new(),
495 theirs_ge_dichotomy_set: HashSet::new(),
496 remote_unrecs: Vec::new(),
497 })
498 } else {
499 let mut inodes = HashSet::new();
500 let inodes_ = get_local_inodes(txn, current_channel, repo, path)?;
501 let mut to_download = Vec::new();
502 inodes.extend(inodes_.iter().map(|x| libpijul::pristine::Position {
503 change: txn.get_external(&x.change).unwrap().unwrap().into(),
504 pos: x.pos,
505 }));
506 if let Some(remote_channel) = txn.load_channel(remote_channel)? {
507 let remote_channel = remote_channel.read();
508 for x in txn.reverse_log(&remote_channel, None)? {
509 let (_, (h, m)) = x?;
510 if txn
511 .channel_has_state(txn.states(&*current_channel.read()), &m)?
512 .is_some()
513 {
514 break;
515 }
516 let h_int = txn.get_internal(h)?.unwrap();
517 if txn
518 .get_changeset(txn.changes(&*current_channel.read()), h_int)?
519 .is_none()
520 {
521 if inodes_.is_empty()
522 || inodes_.iter().any(|&inode| {
523 txn.get_rev_touched_files(h_int, Some(&inode))
524 .unwrap()
525 .is_some()
526 })
527 {
528 to_download.push(CS::Change(h.into()));
529 }
530 }
531 }
532 }
533 Ok(RemoteDelta {
534 inodes,
535 to_download,
536 remote_ref: None,
537 ours_ge_dichotomy_set: HashSet::new(),
538 theirs_ge_dichotomy: Vec::new(),
539 theirs_ge_dichotomy_set: HashSet::new(),
540 remote_unrecs: Vec::new(),
541 })
542 }
543}
544
545impl RemoteRepo {
546 fn name(&self) -> Option<&str> {
547 match *self {
548 RemoteRepo::Ssh(ref s) => Some(s.name.as_str()),
549 RemoteRepo::Local(ref l) => Some(l.name.as_str()),
550 RemoteRepo::Http(ref h) => Some(h.name.as_str()),
551 RemoteRepo::LocalChannel(_) => None,
552 RemoteRepo::None => unreachable!(),
553 }
554 }
555
556 pub fn repo_name(&self) -> Result<Option<String>, anyhow::Error> {
557 match *self {
558 RemoteRepo::Ssh(ref s) => {
559 if let Some(sep) = s.name.rfind(|c| c == ':' || c == '/') {
560 Ok(Some(s.name.split_at(sep + 1).1.to_string()))
561 } else {
562 Ok(Some(s.name.as_str().to_string()))
563 }
564 }
565 RemoteRepo::Local(ref l) => {
566 if let Some(file) = l.root.file_name() {
567 Ok(Some(
568 file.to_str()
569 .context("failed to decode local repository name")?
570 .to_string(),
571 ))
572 } else {
573 Ok(None)
574 }
575 }
576 RemoteRepo::Http(ref h) => {
577 if let Some(name) = libpijul::path::file_name(h.url.path()) {
578 if !name.trim().is_empty() {
579 return Ok(Some(name.trim().to_string()));
580 }
581 }
582 Ok(h.url.host().map(|h| h.to_string()))
583 }
584 RemoteRepo::LocalChannel(_) => Ok(None),
585 RemoteRepo::None => unreachable!(),
586 }
587 }
588
589 pub async fn finish(&mut self) -> Result<(), anyhow::Error> {
590 if let RemoteRepo::Ssh(s) = self {
591 s.finish().await?
592 }
593 Ok(())
594 }
595
596 pub async fn update_changelist<T: MutTxnTExt + TxnTExt + 'static>(
597 &mut self,
598 txn: &mut T,
599 path: &[String],
600 ) -> Result<Option<(HashSet<Position<Hash>>, RemoteRef<T>)>, anyhow::Error> {
601 debug!("update_changelist");
602 let id = if let Some(id) = self.get_id(txn).await? {
603 id
604 } else {
605 return Ok(None);
606 };
607 let mut remote = if let Some(name) = self.name() {
608 txn.open_or_create_remote(id, name)?
609 } else {
610 return Ok(None);
611 };
612 let n = self.dichotomy_changelist(txn, &remote.lock()).await?;
613 debug!("update changelist {:?}", n);
614 let v: Vec<_> = txn
615 .iter_remote(&remote.lock().remote, n)?
616 .filter_map(|k| {
617 debug!("filter_map {:?}", k);
618 let k = (*k.unwrap().0).into();
619 if k >= n {
620 Some(k)
621 } else {
622 None
623 }
624 })
625 .collect();
626 for k in v {
627 debug!("deleting {:?}", k);
628 txn.del_remote(&mut remote, k)?;
629 }
630 let v: Vec<_> = txn
631 .iter_tags(&remote.lock().tags, n)?
632 .filter_map(|k| {
633 debug!("filter_map {:?}", k);
634 let k = (*k.unwrap().0).into();
635 if k >= n {
636 Some(k)
637 } else {
638 None
639 }
640 })
641 .collect();
642 for k in v {
643 debug!("deleting {:?}", k);
644 txn.del_tags(&mut remote.lock().tags, k)?;
645 }
646
647 debug!("deleted");
648 let paths = self.download_changelist(txn, &mut remote, n, path).await?;
649 Ok(Some((paths, remote)))
650 }
651
652 async fn update_changelist_pushpull_from_scratch(
653 &mut self,
654 txn: &mut MutTxn<()>,
655 path: &[String],
656 current_channel: &ChannelRef<MutTxn<()>>,
657 ) -> Result<RemoteDelta<MutTxn<()>>, anyhow::Error> {
658 debug!("no id, starting from scratch");
659 let (inodes, theirs_ge_dichotomy) = self.download_changelist_nocache(0, path).await?;
660 let mut theirs_ge_dichotomy_set = HashSet::new();
661 let mut to_download = Vec::new();
662 for (_, h, m, is_tag) in theirs_ge_dichotomy.iter() {
663 theirs_ge_dichotomy_set.insert(CS::Change(*h));
664 if txn.get_revchanges(current_channel, h)?.is_none() {
665 to_download.push(CS::Change(*h));
666 }
667 if *is_tag {
668 let ch = current_channel.read();
669 if let Some(n) = txn.channel_has_state(txn.states(&*ch), &m.into())? {
670 if !txn.is_tagged(txn.tags(&*ch), n.into())? {
671 to_download.push(CS::State(*m));
672 }
673 } else {
674 to_download.push(CS::State(*m));
675 }
676 }
677 }
678 Ok(RemoteDelta {
679 inodes,
680 remote_ref: None,
681 to_download,
682 ours_ge_dichotomy_set: HashSet::new(),
683 theirs_ge_dichotomy,
684 theirs_ge_dichotomy_set,
685 remote_unrecs: Vec::new(),
686 })
687 }
688
689 pub async fn update_changelist_pushpull(
702 &mut self,
703 txn: &mut MutTxn<()>,
704 path: &[String],
705 current_channel: &ChannelRef<MutTxn<()>>,
706 force_cache: Option<bool>,
707 repo: &Repository,
708 specific_changes: &[String],
709 is_pull: bool,
710 ) -> Result<RemoteDelta<MutTxn<()>>, anyhow::Error> {
711 debug!("update_changelist_pushpull");
712 if let RemoteRepo::LocalChannel(c) = self {
713 return update_changelist_local_channel(
714 c,
715 txn,
716 path,
717 current_channel,
718 repo,
719 specific_changes,
720 );
721 }
722
723 let id = if let Some(id) = self.get_id(txn).await? {
724 debug!("id = {:?}", id);
725 id
726 } else {
727 return self
728 .update_changelist_pushpull_from_scratch(txn, path, current_channel)
729 .await;
730 };
731 let mut remote_ref = txn.open_or_create_remote(id, self.name().unwrap()).unwrap();
732 let dichotomy_n = self.dichotomy_changelist(txn, &remote_ref.lock()).await?;
733 let ours_ge_dichotomy: Vec<(u64, CS)> = txn
734 .iter_remote(&remote_ref.lock().remote, dichotomy_n)?
735 .filter_map(|k| {
736 debug!("filter_map {:?}", k);
737 match k.unwrap() {
738 (k, libpijul::pristine::Pair { a: hash, .. }) => {
739 let (k, hash) = (u64::from(*k), Hash::from(*hash));
740 if k >= dichotomy_n {
741 Some((k, CS::Change(hash)))
742 } else {
743 None
744 }
745 }
746 }
747 })
748 .collect();
749 let (inodes, theirs_ge_dichotomy) =
750 self.download_changelist_nocache(dichotomy_n, path).await?;
751 debug!("theirs_ge_dichotomy = {:?}", theirs_ge_dichotomy);
752 let ours_ge_dichotomy_set = ours_ge_dichotomy
753 .iter()
754 .map(|(_, h)| h)
755 .copied()
756 .collect::<HashSet<CS>>();
757 let mut theirs_ge_dichotomy_set = HashSet::new();
758 for (_, h, m, is_tag) in theirs_ge_dichotomy.iter() {
759 theirs_ge_dichotomy_set.insert(CS::Change(*h));
760 if *is_tag {
761 theirs_ge_dichotomy_set.insert(CS::State(*m));
762 }
763 }
764
765 let remote_unrecs = remote_unrecs(
767 txn,
768 current_channel,
769 &ours_ge_dichotomy,
770 &theirs_ge_dichotomy_set,
771 )?;
772 let should_cache = if let Some(true) = force_cache {
773 true
774 } else {
775 remote_unrecs.is_empty()
776 };
777 debug!(
778 "should_cache = {:?} {:?} {:?}",
779 force_cache, remote_unrecs, should_cache
780 );
781 if should_cache {
782 use libpijul::ChannelMutTxnT;
783 for (k, t) in ours_ge_dichotomy.iter().copied() {
784 match t {
785 CS::State(_) => txn.del_tags(&mut remote_ref.lock().tags, k)?,
786 CS::Change(_) => {
787 txn.del_remote(&mut remote_ref, k)?;
788 }
789 }
790 }
791 for (n, h, m, is_tag) in theirs_ge_dichotomy.iter().copied() {
792 debug!("theirs: {:?} {:?} {:?}", n, h, m);
793 txn.put_remote(&mut remote_ref, n, (h, m))?;
794 if is_tag {
795 txn.put_tags(&mut remote_ref.lock().tags, n, &m)?;
796 }
797 }
798 }
799 if !specific_changes.is_empty() {
800 let to_download = specific_changes
802 .iter()
803 .map(|h| {
804 if is_pull {
805 {
806 if let Ok(t) = txn.state_from_prefix(&remote_ref.lock().states, h) {
807 return Ok(CS::State(t.0));
808 }
809 }
810 Ok(CS::Change(txn.hash_from_prefix_remote(&remote_ref, h)?))
811 } else {
812 if let Ok(t) = txn.state_from_prefix(¤t_channel.read().states, h) {
813 Ok(CS::State(t.0))
814 } else {
815 Ok(CS::Change(txn.hash_from_prefix(h)?.0))
816 }
817 }
818 })
819 .collect::<Result<Vec<_>, anyhow::Error>>();
820 Ok(RemoteDelta {
821 inodes,
822 remote_ref: Some(remote_ref),
823 to_download: to_download?,
824 ours_ge_dichotomy_set,
825 theirs_ge_dichotomy,
826 theirs_ge_dichotomy_set,
827 remote_unrecs,
828 })
829 } else {
830 let mut to_download: Vec<CS> = Vec::new();
831 let mut to_download_ = HashSet::new();
832 for x in txn.iter_rev_remote(&remote_ref.lock().remote, None)? {
833 let (_, p) = x?;
834 let h: Hash = p.a.into();
835 if txn
836 .channel_has_state(txn.states(¤t_channel.read()), &p.b)
837 .unwrap()
838 .is_some()
839 {
840 break;
841 }
842 if txn.get_revchanges(¤t_channel, &h).unwrap().is_none() {
843 let h = CS::Change(h);
844 if to_download_.insert(h.clone()) {
845 to_download.push(h);
846 }
847 }
848 }
849
850 for (n, h, m, is_tag) in theirs_ge_dichotomy.iter() {
853 debug!(
854 "update_changelist_pushpull line {}, {:?} {:?}",
855 line!(),
856 n,
857 h
858 );
859 let ch = CS::Change(*h);
861 if txn.get_revchanges(¤t_channel, h).unwrap().is_none() {
862 if to_download_.insert(ch.clone()) {
863 to_download.push(ch.clone());
864 }
865 if *is_tag {
866 to_download.push(CS::State(*m));
867 }
868 } else if *is_tag {
869 let has_tag = if let Some(n) =
870 txn.channel_has_state(txn.states(¤t_channel.read()), &m.into())?
871 {
872 txn.is_tagged(txn.tags(¤t_channel.read()), n.into())?
873 } else {
874 false
875 };
876 if !has_tag {
877 to_download.push(CS::State(*m));
878 }
879 }
880 if should_cache && ours_ge_dichotomy_set.get(&ch).is_none() {
883 use libpijul::ChannelMutTxnT;
884 txn.put_remote(&mut remote_ref, *n, (*h, *m))?;
885 if *is_tag {
886 let mut rem = remote_ref.lock();
887 txn.put_tags(&mut rem.tags, *n, m)?;
888 }
889 }
890 }
891 Ok(RemoteDelta {
892 inodes,
893 remote_ref: Some(remote_ref),
894 to_download,
895 ours_ge_dichotomy_set,
896 theirs_ge_dichotomy,
897 theirs_ge_dichotomy_set,
898 remote_unrecs,
899 })
900 }
901 }
902
903 pub async fn download_changelist_nocache(
907 &mut self,
908 from: u64,
909 paths: &[String],
910 ) -> Result<(HashSet<Position<Hash>>, Vec<(u64, Hash, Merkle, bool)>), anyhow::Error> {
911 let mut v = Vec::new();
912 let f = |v: &mut Vec<(u64, Hash, Merkle, bool)>, n, h, m, m2| {
913 debug!("no cache: {:?}", h);
914 Ok(v.push((n, h, m, m2)))
915 };
916 let r = match *self {
917 RemoteRepo::Local(ref mut l) => l.download_changelist(f, &mut v, from, paths)?,
918 RemoteRepo::Ssh(ref mut s) => s.download_changelist(f, &mut v, from, paths).await?,
919 RemoteRepo::Http(ref h) => h.download_changelist(f, &mut v, from, paths).await?,
920 RemoteRepo::LocalChannel(_) => HashSet::new(),
921 RemoteRepo::None => unreachable!(),
922 };
923 Ok((r, v))
924 }
925
926 async fn dichotomy_changelist<T: MutTxnT + TxnTExt>(
930 &mut self,
931 txn: &T,
932 remote: &libpijul::pristine::Remote<T>,
933 ) -> Result<u64, anyhow::Error> {
934 let mut a = 0;
935 let (mut b, state): (_, Merkle) = if let Some((u, v)) = txn.last_remote(&remote.remote)? {
936 debug!("dichotomy_changelist: {:?} {:?}", u, v);
937 (u, (&v.b).into())
938 } else {
939 debug!("the local copy of the remote has no changes");
940 return Ok(0);
941 };
942 let last_statet = if let Some((_, _, v)) = txn.last_remote_tag(&remote.tags)? {
943 v.into()
944 } else {
945 Merkle::zero()
946 };
947 debug!("last_state: {:?} {:?}", state, last_statet);
948 if let Some((_, s, st)) = self.get_state(txn, Some(b)).await? {
949 debug!("remote last_state: {:?} {:?}", s, st);
950 if s == state && st == last_statet {
951 return Ok(b + 1);
953 }
954 }
955 while a < b {
959 let mid = (a + b) / 2;
960 let (mid, state) = {
961 let (a, b) = txn.get_remote_state(&remote.remote, mid)?.unwrap();
962 (a, b.b)
963 };
964 let statet = if let Some((_, b)) = txn.get_remote_tag(&remote.tags, mid)? {
965 b.b.into()
968 } else {
969 last_statet
972 };
973
974 let remote_state = self.get_state(txn, Some(mid)).await?;
975 debug!("dichotomy {:?} {:?} {:?}", mid, state, remote_state);
976 if let Some((_, remote_state, remote_statet)) = remote_state {
977 if remote_state == state && remote_statet == statet {
978 if a == mid {
979 return Ok(a + 1);
980 } else {
981 a = mid;
982 continue;
983 }
984 }
985 }
986 if b == mid {
987 break;
988 } else {
989 b = mid
990 }
991 }
992 Ok(a)
993 }
994
995 async fn get_state<T: libpijul::TxnTExt>(
996 &mut self,
997 txn: &T,
998 mid: Option<u64>,
999 ) -> Result<Option<(u64, Merkle, Merkle)>, anyhow::Error> {
1000 match *self {
1001 RemoteRepo::Local(ref mut l) => l.get_state(mid),
1002 RemoteRepo::Ssh(ref mut s) => s.get_state(mid).await,
1003 RemoteRepo::Http(ref mut h) => h.get_state(mid).await,
1004 RemoteRepo::LocalChannel(ref channel) => {
1005 if let Some(channel) = txn.load_channel(&channel)? {
1006 local::get_state(txn, &channel, mid)
1007 } else {
1008 Ok(None)
1009 }
1010 }
1011 RemoteRepo::None => unreachable!(),
1012 }
1013 }
1014
1015 async fn get_id<T: libpijul::TxnTExt + 'static>(
1019 &mut self,
1020 txn: &T,
1021 ) -> Result<Option<libpijul::pristine::RemoteId>, anyhow::Error> {
1022 match *self {
1023 RemoteRepo::Local(ref l) => Ok(Some(l.get_id()?)),
1024 RemoteRepo::Ssh(ref mut s) => s.get_id().await,
1025 RemoteRepo::Http(ref h) => h.get_id().await,
1026 RemoteRepo::LocalChannel(ref channel) => {
1027 if let Some(channel) = txn.load_channel(&channel)? {
1028 Ok(txn.id(&*channel.read()).cloned())
1029 } else {
1030 Err(anyhow::anyhow!(
1031 "Unable to retrieve RemoteId for LocalChannel remote"
1032 ))
1033 }
1034 }
1035 RemoteRepo::None => unreachable!(),
1036 }
1037 }
1038
1039 pub async fn archive<W: std::io::Write + Send + 'static>(
1040 &mut self,
1041 prefix: Option<String>,
1042 state: Option<(Merkle, &[Hash])>,
1043 umask: u16,
1044 w: W,
1045 ) -> Result<u64, anyhow::Error> {
1046 match *self {
1047 RemoteRepo::Local(ref mut l) => {
1048 debug!("archiving local repo");
1049 let changes = libpijul::changestore::filesystem::FileSystem::from_root(
1050 &l.root,
1051 pijul_repository::max_files()?,
1052 );
1053 let mut tarball = libpijul::output::Tarball::new(w, prefix, umask);
1054 let conflicts = if let Some((state, extra)) = state {
1055 let txn = l.pristine.arc_txn_begin()?;
1056 let channel = {
1057 let txn = txn.read();
1058 txn.load_channel(&l.channel)?.unwrap()
1059 };
1060 txn.archive_with_state(&changes, &channel, &state, extra, &mut tarball, 0)?
1061 } else {
1062 let txn = l.pristine.arc_txn_begin()?;
1063 let channel = {
1064 let txn = txn.read();
1065 txn.load_channel(&l.channel)?.unwrap()
1066 };
1067 txn.archive(&changes, &channel, &mut tarball)?
1068 };
1069 Ok(conflicts.len() as u64)
1070 }
1071 RemoteRepo::Ssh(ref mut s) => s.archive(prefix, state, w).await,
1072 RemoteRepo::Http(ref mut h) => h.archive(prefix, state, w).await,
1073 RemoteRepo::LocalChannel(_) => unreachable!(),
1074 RemoteRepo::None => unreachable!(),
1075 }
1076 }
1077
1078 async fn download_changelist<T: MutTxnTExt>(
1079 &mut self,
1080 txn: &mut T,
1081 remote: &mut RemoteRef<T>,
1082 from: u64,
1083 paths: &[String],
1084 ) -> Result<HashSet<Position<Hash>>, anyhow::Error> {
1085 let f = |a: &mut (&mut T, &mut RemoteRef<T>), n, h, m, is_tag| {
1086 let (ref mut txn, ref mut remote) = *a;
1087 txn.put_remote(remote, n, (h, m))?;
1088 if is_tag {
1089 txn.put_tags(&mut remote.lock().tags, n, &m.into())?;
1090 }
1091 Ok(())
1092 };
1093 match *self {
1094 RemoteRepo::Local(ref mut l) => {
1095 l.download_changelist(f, &mut (txn, remote), from, paths)
1096 }
1097 RemoteRepo::Ssh(ref mut s) => {
1098 s.download_changelist(f, &mut (txn, remote), from, paths)
1099 .await
1100 }
1101 RemoteRepo::Http(ref h) => {
1102 h.download_changelist(f, &mut (txn, remote), from, paths)
1103 .await
1104 }
1105 RemoteRepo::LocalChannel(_) => Ok(HashSet::new()),
1106 RemoteRepo::None => unreachable!(),
1107 }
1108 }
1109
1110 pub async fn upload_changes<T: MutTxnTExt + 'static>(
1111 &mut self,
1112 txn: &mut T,
1113 local: PathBuf,
1114 to_channel: Option<&str>,
1115 changes: &[CS],
1116 ) -> Result<(), anyhow::Error> {
1117 let upload_bar = ProgressBar::new(changes.len() as u64, UPLOAD_MESSAGE)?;
1118
1119 match self {
1120 RemoteRepo::Local(ref mut l) => {
1121 l.upload_changes(upload_bar, local, to_channel, changes)?
1122 }
1123 RemoteRepo::Ssh(ref mut s) => {
1124 s.upload_changes(upload_bar, local, to_channel, changes)
1125 .await?
1126 }
1127 RemoteRepo::Http(ref h) => {
1128 h.upload_changes(upload_bar, local, to_channel, changes)
1129 .await?
1130 }
1131 RemoteRepo::LocalChannel(ref channel) => {
1132 let mut channel = txn.open_or_create_channel(channel)?;
1133 let store = libpijul::changestore::filesystem::FileSystem::from_changes(
1134 local,
1135 pijul_repository::max_files()?,
1136 );
1137 local::upload_changes(upload_bar, &store, txn, &mut channel, changes)?
1138 }
1139 RemoteRepo::None => unreachable!(),
1140 }
1141 Ok(())
1142 }
1143
1144 pub async fn download_changes(
1146 &mut self,
1147 progress_bar: ProgressBar,
1148 hashes: &mut tokio::sync::mpsc::UnboundedReceiver<CS>,
1149 send: &mut tokio::sync::mpsc::Sender<(CS, bool)>,
1150 path: &mut PathBuf,
1151 full: bool,
1152 ) -> Result<bool, anyhow::Error> {
1153 debug!("download_changes");
1154 match *self {
1155 RemoteRepo::Local(ref mut l) => {
1156 l.download_changes(progress_bar, hashes, send, path).await?
1157 }
1158 RemoteRepo::Ssh(ref mut s) => {
1159 s.download_changes(progress_bar, hashes, send, path, full)
1160 .await?
1161 }
1162 RemoteRepo::Http(ref mut h) => {
1163 h.download_changes(progress_bar, hashes, send, path, full)
1164 .await?
1165 }
1166 RemoteRepo::LocalChannel(_) => {
1167 while let Some(c) = hashes.recv().await {
1168 send.send((c, true)).await?;
1169 }
1170 }
1171 RemoteRepo::None => unreachable!(),
1172 }
1173 Ok(true)
1174 }
1175
1176 pub async fn update_identities<T: MutTxnTExt + TxnTExt + GraphIter>(
1177 &mut self,
1178 repo: &mut Repository,
1179 remote: &RemoteRef<T>,
1180 ) -> Result<(), anyhow::Error> {
1181 debug!("Downloading identities");
1182 let mut id_path = repo.path.clone();
1183 id_path.push(DOT_DIR);
1184 id_path.push("identities");
1185 let rev = None;
1186 let r = match *self {
1187 RemoteRepo::Local(ref mut l) => l.update_identities(rev, id_path).await?,
1188 RemoteRepo::Ssh(ref mut s) => s.update_identities(rev, id_path).await?,
1189 RemoteRepo::Http(ref mut h) => h.update_identities(rev, id_path).await?,
1190 RemoteRepo::LocalChannel(_) => 0,
1191 RemoteRepo::None => unreachable!(),
1192 };
1193 remote.set_id_revision(r);
1194 Ok(())
1195 }
1196
1197 pub async fn prove(&mut self, key: libpijul::key::SKey) -> Result<(), anyhow::Error> {
1198 match *self {
1199 RemoteRepo::Ssh(ref mut s) => s.prove(key).await,
1200 RemoteRepo::Http(ref mut h) => h.prove(key).await,
1201 RemoteRepo::None => unreachable!(),
1202 _ => Ok(()),
1203 }
1204 }
1205
1206 pub async fn pull<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
1207 &mut self,
1208 repo: &mut Repository,
1209 txn: &mut T,
1210 channel: &mut ChannelRef<T>,
1211 to_apply: &[CS],
1212 inodes: &HashSet<Position<Hash>>,
1213 do_apply: bool,
1214 ) -> Result<Vec<CS>, anyhow::Error> {
1215 let apply_len = to_apply.len() as u64;
1216 let download_bar = ProgressBar::new(apply_len, DOWNLOAD_MESSAGE)?;
1217 let apply_bar = if do_apply {
1218 Some(ProgressBar::new(apply_len, APPLY_MESSAGE)?)
1219 } else {
1220 None
1221 };
1222
1223 let (mut send, recv) = tokio::sync::mpsc::channel(100);
1224
1225 let mut self_ = std::mem::replace(self, RemoteRepo::None);
1226 let (hash_send, mut hash_recv) = tokio::sync::mpsc::unbounded_channel();
1227 let mut change_path_ = repo.path.clone();
1228 change_path_.push(DOT_DIR);
1229 change_path_.push("changes");
1230 let cloned_download_bar = download_bar.clone();
1231 let t = tokio::spawn(async move {
1232 self_
1233 .download_changes(
1234 cloned_download_bar,
1235 &mut hash_recv,
1236 &mut send,
1237 &mut change_path_,
1238 false,
1239 )
1240 .await?;
1241
1242 Ok::<_, anyhow::Error>(self_)
1243 });
1244
1245 let mut change_path_ = repo.changes_dir.clone();
1246 let mut waiting = 0;
1247 let (send_ready, mut recv_ready) = tokio::sync::mpsc::channel(100);
1248
1249 let mut asked = HashSet::new();
1250 for h in to_apply {
1251 debug!("to_apply {:?}", h);
1252 match h {
1253 CS::Change(h) => {
1254 libpijul::changestore::filesystem::push_filename(&mut change_path_, h);
1255 }
1256 CS::State(h) => {
1257 libpijul::changestore::filesystem::push_tag_filename(&mut change_path_, h);
1258 }
1259 }
1260 asked.insert(*h);
1261 hash_send.send(*h)?;
1262 waiting += 1;
1263 libpijul::changestore::filesystem::pop_filename(&mut change_path_);
1264 }
1265
1266 let u = self
1267 .download_changes_rec(
1268 repo,
1269 hash_send,
1270 recv,
1271 send_ready,
1272 download_bar,
1273 waiting,
1274 asked,
1275 )
1276 .await?;
1277
1278 let mut ws = libpijul::ApplyWorkspace::new();
1279 let mut to_apply_inodes = HashSet::new();
1280 while let Some(h) = recv_ready.recv().await {
1281 debug!("to_apply: {:?}", h);
1282 let touches_inodes = inodes.is_empty()
1283 || {
1284 debug!("inodes = {:?}", inodes);
1285 use libpijul::changestore::ChangeStore;
1286 if let CS::Change(ref h) = h {
1287 let changes = repo.changes.get_changes(h)?;
1288 changes.iter().any(|c| {
1289 c.iter().any(|c| {
1290 let inode = c.inode();
1291 debug!("inode = {:?}", inode);
1292 if let Some(h) = inode.change {
1293 inodes.contains(&Position {
1294 change: h,
1295 pos: inode.pos,
1296 })
1297 } else {
1298 false
1299 }
1300 })
1301 })
1302 } else {
1303 false
1304 }
1305 }
1306 || { inodes.iter().any(|i| CS::Change(i.change) == h) };
1307
1308 if touches_inodes {
1309 to_apply_inodes.insert(h);
1310 } else {
1311 continue;
1312 }
1313
1314 if let Some(apply_bar) = apply_bar.clone() {
1315 info!("Applying {:?}", h);
1316 apply_bar.inc(1);
1317 debug!("apply");
1318 if let CS::Change(h) = h {
1319 let mut channel = channel.write();
1320 txn.apply_change_rec_ws(&repo.changes, &mut channel, &h, &mut ws)?;
1321 }
1322 debug!("applied");
1323 } else {
1324 debug!("not applying {:?}", h)
1325 }
1326 }
1327
1328 let mut result = Vec::with_capacity(to_apply_inodes.len());
1329 for h in to_apply {
1330 if to_apply_inodes.contains(&h) {
1331 result.push(*h)
1332 }
1333 }
1334
1335 debug!("finished");
1336 debug!("waiting for spawned process");
1337 *self = t.await??;
1338 u.await??;
1339 Ok(result)
1340 }
1341
1342 async fn download_changes_rec(
1343 &mut self,
1344 repo: &mut Repository,
1345 send_hash: tokio::sync::mpsc::UnboundedSender<CS>,
1346 mut recv_signal: tokio::sync::mpsc::Receiver<(CS, bool)>,
1347 send_ready: tokio::sync::mpsc::Sender<CS>,
1348 progress_bar: ProgressBar,
1349 mut waiting: usize,
1350 mut asked: HashSet<CS>,
1351 ) -> Result<tokio::task::JoinHandle<Result<(), anyhow::Error>>, anyhow::Error> {
1352 let mut change_path = repo.changes_dir.clone();
1353 let mut dep_path = repo.changes_dir.clone();
1354 let changes = repo.changes.clone();
1355 let t = tokio::spawn(async move {
1356 if waiting == 0 {
1357 return Ok(());
1358 }
1359 let mut ready = Vec::new();
1360 while let Some((hash, follow)) = recv_signal.recv().await {
1361 debug!("received {:?} {:?}", hash, follow);
1362 if let CS::Change(hash) = hash {
1363 waiting -= 1;
1364 if follow {
1365 libpijul::changestore::filesystem::push_filename(&mut change_path, &hash);
1366 std::fs::create_dir_all(change_path.parent().unwrap())?;
1367 use libpijul::changestore::ChangeStore;
1368 let mut needs_dep = false;
1369 for dep in changes.get_dependencies(&hash)? {
1370 let dep: libpijul::pristine::Hash = dep;
1371
1372 libpijul::changestore::filesystem::push_filename(&mut dep_path, &dep);
1373 let has_dep = std::fs::metadata(&dep_path).is_ok();
1374 libpijul::changestore::filesystem::pop_filename(&mut dep_path);
1375
1376 if !has_dep {
1377 needs_dep = true;
1378 if asked.insert(CS::Change(dep)) {
1379 progress_bar.inc(1);
1380 send_hash.send(CS::Change(dep))?;
1381 waiting += 1
1382 }
1383 }
1384 }
1385 libpijul::changestore::filesystem::pop_filename(&mut change_path);
1386
1387 if !needs_dep {
1388 send_ready.send(CS::Change(hash)).await?;
1389 } else {
1390 ready.push(CS::Change(hash))
1391 }
1392 } else {
1393 send_ready.send(CS::Change(hash)).await?;
1394 }
1395 }
1396 if waiting == 0 {
1397 break;
1398 }
1399 }
1400 info!("waiting loop done");
1401 for r in ready {
1402 send_ready.send(r).await?;
1403 }
1404 std::mem::drop(recv_signal);
1405 Ok(())
1406 });
1407 Ok(t)
1408 }
1409
1410 pub async fn clone_tag<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
1411 &mut self,
1412 repo: &mut Repository,
1413 txn: &mut T,
1414 channel: &mut ChannelRef<T>,
1415 tag: &[Hash],
1416 ) -> Result<(), anyhow::Error> {
1417 let (send_hash, mut recv_hash) = tokio::sync::mpsc::unbounded_channel();
1418 let (mut send_signal, recv_signal) = tokio::sync::mpsc::channel(100);
1419 let mut self_ = std::mem::replace(self, RemoteRepo::None);
1420 let mut change_path_ = repo.changes_dir.clone();
1421 let download_bar = ProgressBar::new(tag.len() as u64, DOWNLOAD_MESSAGE)?;
1422 let cloned_download_bar = download_bar.clone();
1423
1424 let t = tokio::spawn(async move {
1425 self_
1426 .download_changes(
1427 cloned_download_bar,
1428 &mut recv_hash,
1429 &mut send_signal,
1430 &mut change_path_,
1431 false,
1432 )
1433 .await?;
1434 Ok(self_)
1435 });
1436
1437 let mut waiting = 0;
1438 let mut asked = HashSet::new();
1439 for &h in tag.iter() {
1440 waiting += 1;
1441 send_hash.send(CS::Change(h))?;
1442 asked.insert(CS::Change(h));
1443 }
1444
1445 let (send_ready, mut recv_ready) = tokio::sync::mpsc::channel(100);
1446
1447 let u = self
1448 .download_changes_rec(
1449 repo,
1450 send_hash,
1451 recv_signal,
1452 send_ready,
1453 download_bar,
1454 waiting,
1455 asked,
1456 )
1457 .await?;
1458
1459 let mut hashes = Vec::new();
1460 let mut ws = libpijul::ApplyWorkspace::new();
1461 {
1462 let mut channel_ = channel.write();
1463 while let Some(hash) = recv_ready.recv().await {
1464 if let CS::Change(ref hash) = hash {
1465 txn.apply_change_rec_ws(&repo.changes, &mut channel_, hash, &mut ws)?;
1466 }
1467 hashes.push(hash);
1468 }
1469 }
1470 let r: Result<_, anyhow::Error> = t.await?;
1471 *self = r?;
1472 u.await??;
1473 self.complete_changes(repo, txn, channel, &hashes, false)
1474 .await?;
1475 Ok(())
1476 }
1477
1478 pub async fn clone_state<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
1479 &mut self,
1480 repo: &mut Repository,
1481 txn: &mut T,
1482 channel: &mut ChannelRef<T>,
1483 state: Merkle,
1484 ) -> Result<(), anyhow::Error> {
1485 let id = if let Some(id) = self.get_id(txn).await? {
1486 id
1487 } else {
1488 return Ok(());
1489 };
1490 self.update_changelist(txn, &[]).await?;
1491 let remote = txn.open_or_create_remote(id, self.name().unwrap()).unwrap();
1492 let mut to_pull = Vec::new();
1493 let mut found = false;
1494 for x in txn.iter_remote(&remote.lock().remote, 0)? {
1495 let (n, p) = x?;
1496 debug!("{:?} {:?}", n, p);
1497 to_pull.push(CS::Change(p.a.into()));
1498 if p.b == state {
1499 found = true;
1500 break;
1501 }
1502 }
1503 if !found {
1504 bail!("State not found: {:?}", state)
1505 }
1506 self.pull(repo, txn, channel, &to_pull, &HashSet::new(), true)
1507 .await?;
1508 self.update_identities(repo, &remote).await?;
1509
1510 self.complete_changes(repo, txn, channel, &to_pull, false)
1511 .await?;
1512 Ok(())
1513 }
1514
1515 pub async fn complete_changes<T: MutTxnT + TxnTExt + GraphIter>(
1516 &mut self,
1517 repo: &pijul_repository::Repository,
1518 txn: &T,
1519 local_channel: &mut ChannelRef<T>,
1520 changes: &[CS],
1521 full: bool,
1522 ) -> Result<(), anyhow::Error> {
1523 debug!("complete changes {:?}", changes);
1524 use libpijul::changestore::ChangeStore;
1525 let (send_hash, mut recv_hash) = tokio::sync::mpsc::unbounded_channel();
1526 let (mut send_sig, mut recv_sig) = tokio::sync::mpsc::channel(100);
1527 let mut self_ = std::mem::replace(self, RemoteRepo::None);
1528 let mut changes_dir = repo.changes_dir.clone();
1529
1530 let download_bar = ProgressBar::new(changes.len() as u64, DOWNLOAD_MESSAGE)?;
1531 let _completion_spinner = Spinner::new(COMPLETE_MESSAGE)?;
1532 let t: tokio::task::JoinHandle<Result<RemoteRepo, anyhow::Error>> =
1533 tokio::spawn(async move {
1534 self_
1535 .download_changes(
1536 download_bar,
1537 &mut recv_hash,
1538 &mut send_sig,
1539 &mut changes_dir,
1540 true,
1541 )
1542 .await?;
1543 Ok::<_, anyhow::Error>(self_)
1544 });
1545
1546 for c in changes {
1547 let c = if let CS::Change(c) = c { c } else { continue };
1548 let sc = c.into();
1549 if repo
1550 .changes
1551 .has_contents(*c, txn.get_internal(&sc)?.cloned())
1552 {
1553 debug!("has contents {:?}", c);
1554 continue;
1555 }
1556 if full {
1557 debug!("sending send_hash");
1558 send_hash.send(CS::Change(*c))?;
1559 debug!("sent");
1560 continue;
1561 }
1562 let change = if let Some(&i) = txn.get_internal(&sc)? {
1563 i
1564 } else {
1565 debug!("could not find internal for {:?}", sc);
1566 continue;
1567 };
1568 let v = libpijul::pristine::Vertex {
1570 change,
1571 start: libpijul::pristine::ChangePosition(0u64.into()),
1572 end: libpijul::pristine::ChangePosition(0u64.into()),
1573 };
1574 let channel = local_channel.read();
1575 let graph = txn.graph(&channel);
1576 for x in txn.iter_graph(graph, Some(&v))? {
1577 let (v, e) = x?;
1578 if v.change > change {
1579 break;
1580 } else if e.flag().is_alive_parent() {
1581 send_hash.send(CS::Change(*c))?;
1582 break;
1583 }
1584 }
1585 }
1586 debug!("dropping send_hash");
1587 std::mem::drop(send_hash);
1588 while recv_sig.recv().await.is_some() {}
1589 *self = t.await??;
1590 Ok(())
1591 }
1592
1593 pub async fn clone_channel<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
1594 &mut self,
1595 repo: &mut Repository,
1596 txn: &mut T,
1597 local_channel: &mut ChannelRef<T>,
1598 path: &[String],
1599 ) -> Result<(), anyhow::Error> {
1600 let (inodes, remote_changes) = if let Some(x) = self.update_changelist(txn, path).await? {
1601 x
1602 } else {
1603 bail!("Channel not found")
1604 };
1605 let mut pullable = Vec::new();
1606 {
1607 let rem = remote_changes.lock();
1608 for x in txn.iter_remote(&rem.remote, 0)? {
1609 let (_, p) = x?;
1610 pullable.push(CS::Change(p.a.into()))
1611 }
1612 }
1613 self.pull(repo, txn, local_channel, &pullable, &inodes, true)
1614 .await?;
1615 self.update_identities(repo, &remote_changes).await?;
1616
1617 self.complete_changes(repo, txn, local_channel, &pullable, false)
1618 .await?;
1619 Ok(())
1620 }
1621}
1622
1623use libpijul::pristine::{ChangePosition, Position};
1624use regex::Regex;
1625
1626lazy_static! {
1627 static ref CHANGELIST_LINE: Regex = Regex::new(
1628 r#"(?P<num>[0-9]+)\.(?P<hash>[A-Za-z0-9]+)\.(?P<merkle>[A-Za-z0-9]+)(?P<tag>\.)?"#
1629 )
1630 .unwrap();
1631 static ref PATHS_LINE: Regex =
1632 Regex::new(r#"(?P<hash>[A-Za-z0-9]+)\.(?P<num>[0-9]+)"#).unwrap();
1633}
1634
1635enum ListLine {
1636 Change {
1637 n: u64,
1638 h: Hash,
1639 m: Merkle,
1640 tag: bool,
1641 },
1642 Position(Position<Hash>),
1643 Error(String),
1644}
1645
1646fn parse_line(data: &str) -> Result<ListLine, anyhow::Error> {
1647 debug!("data = {:?}", data);
1648 if let Some(caps) = CHANGELIST_LINE.captures(data) {
1649 if let (Some(h), Some(m)) = (
1650 Hash::from_base32(caps.name("hash").unwrap().as_str().as_bytes()),
1651 Merkle::from_base32(caps.name("merkle").unwrap().as_str().as_bytes()),
1652 ) {
1653 return Ok(ListLine::Change {
1654 n: caps.name("num").unwrap().as_str().parse().unwrap(),
1655 h,
1656 m,
1657 tag: caps.name("tag").is_some(),
1658 });
1659 }
1660 }
1661 if data.starts_with("error:") {
1662 return Ok(ListLine::Error(data.split_at(6).1.to_string()));
1663 }
1664 if let Some(caps) = PATHS_LINE.captures(data) {
1665 return Ok(ListLine::Position(Position {
1666 change: Hash::from_base32(caps.name("hash").unwrap().as_str().as_bytes()).unwrap(),
1667 pos: ChangePosition(
1668 caps.name("num")
1669 .unwrap()
1670 .as_str()
1671 .parse::<u64>()
1672 .unwrap()
1673 .into(),
1674 ),
1675 }));
1676 }
1677 debug!("offending line: {:?}", data);
1678 bail!("Protocol error")
1679}
1680
1681fn remote_unrecs<T: TxnTExt + ChannelTxnT>(
1685 txn: &T,
1686 current_channel: &ChannelRef<T>,
1687 ours_ge_dichotomy: &[(u64, CS)],
1688 theirs_ge_dichotomy_set: &HashSet<CS>,
1689) -> Result<Vec<(u64, CS)>, anyhow::Error> {
1690 let mut remote_unrecs = Vec::new();
1691 for (n, hash) in ours_ge_dichotomy {
1692 debug!("ours_ge_dichotomy: {:?} {:?}", n, hash);
1693 if theirs_ge_dichotomy_set.contains(hash) {
1694 debug!("still present");
1696 continue;
1697 } else {
1698 let has_it = match hash {
1699 CS::Change(hash) => txn.get_revchanges(¤t_channel, &hash)?.is_some(),
1700 CS::State(state) => {
1701 let ch = current_channel.read();
1702 if let Some(n) = txn.channel_has_state(txn.states(&*ch), &state.into())? {
1703 txn.is_tagged(txn.tags(&*ch), n.into())?
1704 } else {
1705 false
1706 }
1707 }
1708 };
1709 if has_it {
1710 remote_unrecs.push((*n, *hash))
1711 } else {
1712 continue;
1714 }
1715 }
1716 }
1717 Ok(remote_unrecs)
1718}