1use std::collections::HashSet;
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::sync::{Arc, LazyLock};
5
6use anyhow::{Context, bail};
7use libpijul::DOT_DIR;
8use libpijul::pristine::{
9 Base32, ChangeId, ChangePosition, ChannelRef, GraphIter, Hash, Merkle, MutTxnT, Position,
10 RemoteRef, TxnT, sanakirja::MutTxn, sanakirja::RawMutTxnT,
11};
12use libpijul::{ChannelTxnT, DepsTxnT, GraphTxnT, MutTxnTExt, TxnTExt};
13use log::{debug, info};
14use regex::Regex;
15
16use pijul_config::remote::{RemoteConfig, RemoteHttpHeader};
17use pijul_identity::Complete;
18use pijul_repository::*;
19
20pub mod ssh;
21use ssh::*;
22
23pub mod local;
24use local::*;
25
26pub mod http;
27use http::*;
28
29use pijul_interaction::{
30 APPLY_MESSAGE, COMPLETE_MESSAGE, DOWNLOAD_MESSAGE, ProgressBar, Spinner, UPLOAD_MESSAGE,
31};
32
33pub const PROTOCOL_VERSION: usize = 3;
34
35pub enum RemoteRepo {
36 Local(Local),
37 Ssh(Ssh),
38 Http(Http),
39 LocalChannel(String),
40 None,
41}
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
44pub enum CS {
45 Change(Hash),
46 State(Merkle),
47}
48
49pub async fn repository(
50 config: &pijul_config::Config,
51 self_path: Option<&Path>,
52 user: Option<&str>,
54 name: &str,
55 channel: &str,
56 no_cert_check: bool,
57 with_path: bool,
58) -> Result<RemoteRepo, anyhow::Error> {
59 if let Some(name) = config.remotes.iter().find(|e| e.name() == name) {
60 name.to_remote(config, channel, no_cert_check, with_path)
61 .await
62 } else {
63 unknown_remote(
64 config,
65 self_path,
66 user,
67 name,
68 channel,
69 no_cert_check,
70 with_path,
71 )
72 .await
73 }
74}
75
76pub async fn prove(
79 config: &pijul_config::Config,
80 identity: &Complete,
81 origin: Option<&str>,
82 no_cert_check: bool,
83 use_keyring: bool,
84) -> Result<(), anyhow::Error> {
85 let remote = origin.unwrap_or(&identity.config.author.origin);
86 let mut stderr = std::io::stderr();
87 writeln!(
88 stderr,
89 "Linking identity `{}` with {}@{}",
90 &identity.name, &identity.config.author.username, remote
91 )?;
92
93 let mut remote = repository(
94 config,
95 None,
96 Some(&identity.config.author.username),
97 &remote,
98 libpijul::DEFAULT_CHANNEL,
99 no_cert_check,
100 false,
101 )
102 .await?;
103
104 let (key, _password) =
105 identity
106 .credentials
107 .clone()
108 .unwrap()
109 .decrypt(config, &identity.name, use_keyring)?;
110 remote.prove(key).await?;
111
112 Ok(())
113}
114
115fn shell_cmd(s: &str) -> Result<String, anyhow::Error> {
116 let out = if cfg!(target_os = "windows") {
117 std::process::Command::new("cmd")
118 .args(&["/C", s])
119 .output()
120 .expect("failed to execute process")
121 } else {
122 std::process::Command::new(std::env::var("SHELL").unwrap_or("sh".to_string()))
123 .arg("-c")
124 .arg(s)
125 .output()
126 .expect("failed to execute process")
127 };
128 Ok(String::from_utf8(out.stdout)?.trim().to_string())
129}
130
131#[allow(async_fn_in_trait)]
132pub trait ToRemote {
133 async fn to_remote(
134 &self,
135 config: &pijul_config::Config,
136 channel: &str,
137 no_cert_check: bool,
138 with_path: bool,
139 ) -> Result<RemoteRepo, anyhow::Error>;
140}
141
142impl ToRemote for RemoteConfig {
143 async fn to_remote(
144 &self,
145 config: &pijul_config::Config,
146 channel: &str,
147 no_cert_check: bool,
148 with_path: bool,
149 ) -> Result<RemoteRepo, anyhow::Error> {
150 match self {
151 RemoteConfig::Ssh { ssh, .. } => {
152 if let Some(mut sshr) = ssh_remote(None, ssh, with_path) {
153 debug!("unknown_remote, ssh = {:?}", ssh);
154 if let Some(c) = sshr.connect(config, ssh, channel).await? {
155 return Ok(RemoteRepo::Ssh(c));
156 }
157 }
158 bail!("Remote not found: {:?}", ssh)
159 }
160 RemoteConfig::Http {
161 http,
162 headers,
163 name,
164 } => {
165 let mut h = Vec::new();
166 for (k, v) in headers.iter() {
167 match v {
168 RemoteHttpHeader::String(s) => {
169 h.push((k.clone(), s.clone()));
170 }
171 RemoteHttpHeader::Shell(shell) => {
172 h.push((k.clone(), shell_cmd(&shell.shell)?));
173 }
174 }
175 }
176 return Ok(RemoteRepo::Http(Http {
177 url: http.parse().unwrap(),
178 channel: channel.to_string(),
179 client: reqwest::ClientBuilder::new()
180 .danger_accept_invalid_certs(no_cert_check)
181 .build()?,
182 headers: h,
183 name: name.to_string(),
184 }));
185 }
186 }
187 }
188}
189
190pub async fn unknown_remote(
191 config: &pijul_config::Config,
192 self_path: Option<&Path>,
193 user: Option<&str>,
194 name: &str,
195 channel: &str,
196 no_cert_check: bool,
197 with_path: bool,
198) -> Result<RemoteRepo, anyhow::Error> {
199 if let Ok(url) = url::Url::parse(name) {
200 let scheme = url.scheme();
201 if scheme == "http" || scheme == "https" {
202 debug!("unknown_remote, http = {:?}", name);
203 return Ok(RemoteRepo::Http(Http {
204 url,
205 channel: channel.to_string(),
206 client: reqwest::ClientBuilder::new()
207 .danger_accept_invalid_certs(no_cert_check)
208 .build()?,
209 headers: Vec::new(),
210 name: name.to_string(),
211 }));
212 } else if scheme == "ssh" {
213 if let Some(mut ssh) = ssh_remote(user, name, with_path) {
214 debug!("unknown_remote, ssh = {:?}", ssh);
215 if let Some(c) = ssh.connect(config, name, channel).await? {
216 return Ok(RemoteRepo::Ssh(c));
217 }
218 }
219 bail!("Remote not found: {:?}", name)
220 } else {
221 bail!("Remote scheme not supported: {:?}", scheme)
222 }
223 }
224 if let Ok(root) = std::fs::canonicalize(name) {
225 if let Some(path) = self_path {
226 let path = std::fs::canonicalize(path)?;
227 if path == root {
228 return Ok(RemoteRepo::LocalChannel(channel.to_string()));
229 }
230 }
231
232 let mut dot_dir = root.join(DOT_DIR);
233 let changes_dir = dot_dir.join(CHANGES_DIR);
234
235 dot_dir.push(PRISTINE_DIR);
236 debug!("dot_dir = {:?}", dot_dir);
237 match libpijul::pristine::sanakirja::Pristine::new(&dot_dir.join("db")) {
238 Ok(pristine) => {
239 debug!("pristine done");
240 return Ok(RemoteRepo::Local(Local {
241 root: Path::new(name).to_path_buf(),
242 channel: channel.to_string(),
243 changes_dir,
244 pristine: Arc::new(pristine),
245 name: name.to_string(),
246 }));
247 }
248 Err(libpijul::pristine::sanakirja::SanakirjaError::Sanakirja(
249 sanakirja::Error::IO(e),
250 )) if e.kind() == std::io::ErrorKind::NotFound => {
251 debug!("repo not found")
252 }
253 Err(e) => return Err(e.into()),
254 }
255 }
256 if let Some(mut ssh) = ssh_remote(user, name, with_path) {
257 debug!("unknown_remote, ssh = {:?}", ssh);
258 if let Some(c) = ssh.connect(config, name, channel).await? {
259 return Ok(RemoteRepo::Ssh(c));
260 }
261 }
262 bail!("Remote not found: {:?}", name)
263}
264
265pub fn get_local_inodes<T: RawMutTxnT + 'static>(
267 txn: &mut MutTxn<T>,
268 channel: &ChannelRef<MutTxn<T>>,
269 repo: &Repository,
270 path: &[String],
271) -> Result<HashSet<Position<ChangeId>>, anyhow::Error> {
272 let mut paths = HashSet::new();
273 for path in path.iter() {
274 let (p, ambiguous) = txn.follow_oldest_path(&repo.changes, &channel, path)?;
275 if ambiguous {
276 bail!("Ambiguous path: {:?}", path)
277 }
278 paths.insert(p);
279 paths.extend(
280 libpijul::fs::iter_graph_descendants(txn, &channel.read(), p)?.map(|x| x.unwrap()),
281 );
282 }
283 Ok(paths)
284}
285
286pub struct PushDelta {
292 pub to_upload: Vec<CS>,
293 pub remote_unrecs: Vec<(u64, CS)>,
294 pub unknown_changes: Vec<CS>,
295}
296
297pub struct RemoteDelta<T: MutTxnTExt + TxnTExt> {
319 pub inodes: HashSet<Position<Hash>>,
320 pub to_download: Vec<CS>,
321 pub remote_ref: Option<RemoteRef<T>>,
322 pub ours_ge_dichotomy_set: HashSet<CS>,
323 pub theirs_ge_dichotomy_set: HashSet<CS>,
324 pub theirs_ge_dichotomy: Vec<(u64, Hash, Merkle, bool)>,
327 pub remote_unrecs: Vec<(u64, CS)>,
328}
329
330impl<T: RawMutTxnT + 'static> RemoteDelta<MutTxn<T>> {
331 pub fn to_local_channel_push(
334 self,
335 remote_channel: &str,
336 txn: &mut MutTxn<T>,
337 path: &[String],
338 channel: &ChannelRef<MutTxn<T>>,
339 repo: &Repository,
340 ) -> Result<PushDelta, anyhow::Error> {
341 let mut to_upload = Vec::new();
342 let inodes = get_local_inodes(txn, channel, repo, path)?;
343
344 for x in txn.reverse_log(&*channel.read(), None)? {
345 let (_, (h, _)) = x?;
346 if let Some(channel) = txn.load_channel(remote_channel)? {
347 let channel = channel.read();
348 let h_int = txn.get_internal(h)?.unwrap();
349 if txn.get_changeset(txn.changes(&channel), h_int)?.is_none() {
350 if inodes.is_empty() {
351 to_upload.push(CS::Change(h.into()))
352 } else {
353 for p in inodes.iter() {
354 if txn.get_touched_files(p, Some(h_int))?.is_some() {
355 to_upload.push(CS::Change(h.into()));
356 break;
357 }
358 }
359 }
360 }
361 }
362 }
363 assert!(self.ours_ge_dichotomy_set.is_empty());
364 assert!(self.theirs_ge_dichotomy_set.is_empty());
365 let d = PushDelta {
366 to_upload: to_upload.into_iter().rev().collect(),
367 remote_unrecs: self.remote_unrecs,
368 unknown_changes: Vec::new(),
369 };
370 assert!(d.remote_unrecs.is_empty());
371 Ok(d)
372 }
373
374 pub fn to_remote_push(
377 self,
378 txn: &mut MutTxn<T>,
379 path: &[String],
380 channel: &ChannelRef<MutTxn<T>>,
381 repo: &Repository,
382 ) -> Result<PushDelta, anyhow::Error> {
383 let mut to_upload = Vec::new();
384 let inodes = get_local_inodes(txn, channel, repo, path)?;
385 if let Some(ref remote_ref) = self.remote_ref {
386 let mut tags: HashSet<Merkle> = HashSet::new();
387 for x in txn.rev_iter_tags(&channel.read().tags, None)? {
388 let (n, m) = x?;
389 debug!("rev_iter_tags {:?} {:?}", n, m);
390 if let Some((_, p)) = txn.get_remote_tag(&remote_ref.lock().tags, (*n).into())? {
392 if p.b == m.b {
393 debug!("the remote has tag {:?}", p.a);
394 break;
395 }
396 if p.a != m.a {
397 }
401 } else {
402 tags.insert(m.a.into());
403 }
404 }
405 debug!("tags = {:?}", tags);
406 for x in txn.reverse_log(&*channel.read(), None)? {
407 let (_, (h, m)) = x?;
408 let h_unrecorded = self
409 .remote_unrecs
410 .iter()
411 .any(|(_, hh)| hh == &CS::Change(h.into()));
412 if !h_unrecorded {
413 if txn.remote_has_state(remote_ref, &m)?.is_some() {
414 debug!("remote_has_state: {:?}", m);
415 break;
416 }
417 }
418 let h_int = txn.get_internal(h)?.unwrap();
419 let h_deser = Hash::from(h);
420 if (!txn.remote_has_change(remote_ref, &h)? || h_unrecorded)
424 && !self.theirs_ge_dichotomy_set.contains(&CS::Change(h_deser))
425 {
426 if inodes.is_empty() {
427 if tags.remove(&m.into()) {
428 to_upload.push(CS::State(m.into()));
429 }
430 to_upload.push(CS::Change(h_deser));
431 } else {
432 for p in inodes.iter() {
433 if txn.get_touched_files(p, Some(h_int))?.is_some() {
434 to_upload.push(CS::Change(h_deser));
435 if tags.remove(&m.into()) {
436 to_upload.push(CS::State(m.into()));
437 }
438 break;
439 }
440 }
441 }
442 }
443 }
444 for t in tags.iter() {
445 if let Some(n) = txn.remote_has_state(&remote_ref, &t.into())? {
446 if !txn.is_tagged(&remote_ref.lock().tags, n)? {
447 to_upload.push(CS::State(*t));
448 }
449 } else {
450 debug!("the remote doesn't have state {:?}", t);
451 }
452 }
453 }
454
455 let mut unknown_changes = Vec::new();
459 for (_, h, m, is_tag) in self.theirs_ge_dichotomy.iter() {
460 let h_is_known = txn.get_revchanges(&channel, h).unwrap().is_some();
461 let change = CS::Change(*h);
462 if !(self.ours_ge_dichotomy_set.contains(&change) || h_is_known) {
463 unknown_changes.push(change)
464 }
465 if *is_tag {
466 let m_is_known = if let Some(n) = txn
467 .channel_has_state(txn.states(&*channel.read()), &m.into())
468 .unwrap()
469 {
470 txn.is_tagged(txn.tags(&*channel.read()), n.into()).unwrap()
471 } else {
472 false
473 };
474 if !m_is_known {
475 unknown_changes.push(CS::State(*m))
476 }
477 }
478 }
479
480 Ok(PushDelta {
481 to_upload: to_upload.into_iter().rev().collect(),
482 remote_unrecs: self.remote_unrecs,
483 unknown_changes,
484 })
485 }
486}
487
488pub fn update_changelist_local_channel<T: RawMutTxnT + 'static>(
492 remote_channel: &str,
493 txn: &mut MutTxn<T>,
494 path: &[String],
495 current_channel: &ChannelRef<MutTxn<T>>,
496 repo: &Repository,
497 specific_changes: &[String],
498) -> Result<RemoteDelta<MutTxn<T>>, anyhow::Error> {
499 if !specific_changes.is_empty() {
500 let mut to_download = Vec::new();
501 for h in specific_changes {
502 let h = txn.hash_from_prefix(h)?.0;
503 if txn.get_revchanges(current_channel, &h)?.is_none() {
504 to_download.push(CS::Change(h));
505 }
506 }
507 Ok(RemoteDelta {
508 inodes: HashSet::new(),
509 to_download,
510 remote_ref: None,
511 ours_ge_dichotomy_set: HashSet::new(),
512 theirs_ge_dichotomy: Vec::new(),
513 theirs_ge_dichotomy_set: HashSet::new(),
514 remote_unrecs: Vec::new(),
515 })
516 } else {
517 let mut inodes = HashSet::new();
518 let inodes_ = get_local_inodes(txn, current_channel, repo, path)?;
519 let mut to_download = Vec::new();
520 inodes.extend(inodes_.iter().map(|x| libpijul::pristine::Position {
521 change: txn.get_external(&x.change).unwrap().unwrap().into(),
522 pos: x.pos,
523 }));
524 if let Some(remote_channel) = txn.load_channel(remote_channel)? {
525 let remote_channel = remote_channel.read();
526 for x in txn.reverse_log(&remote_channel, None)? {
527 let (_, (h, m)) = x?;
528 if txn
529 .channel_has_state(txn.states(&*current_channel.read()), &m)?
530 .is_some()
531 {
532 break;
533 }
534 let h_int = txn.get_internal(h)?.unwrap();
535 if txn
536 .get_changeset(txn.changes(&*current_channel.read()), h_int)?
537 .is_none()
538 {
539 if inodes_.is_empty()
540 || inodes_.iter().any(|&inode| {
541 txn.get_rev_touched_files(h_int, Some(&inode))
542 .unwrap()
543 .is_some()
544 })
545 {
546 to_download.push(CS::Change(h.into()));
547 }
548 }
549 }
550 }
551 Ok(RemoteDelta {
552 inodes,
553 to_download,
554 remote_ref: None,
555 ours_ge_dichotomy_set: HashSet::new(),
556 theirs_ge_dichotomy: Vec::new(),
557 theirs_ge_dichotomy_set: HashSet::new(),
558 remote_unrecs: Vec::new(),
559 })
560 }
561}
562
563impl RemoteRepo {
564 fn name(&self) -> Option<&str> {
565 match *self {
566 RemoteRepo::Ssh(ref s) => Some(s.name.as_str()),
567 RemoteRepo::Local(ref l) => Some(l.name.as_str()),
568 RemoteRepo::Http(ref h) => Some(h.name.as_str()),
569 RemoteRepo::LocalChannel(_) => None,
570 RemoteRepo::None => unreachable!(),
571 }
572 }
573
574 pub fn repo_name(&self) -> Result<Option<String>, anyhow::Error> {
575 match *self {
576 RemoteRepo::Ssh(ref s) => {
577 if let Some(sep) = s.name.rfind(|c| c == ':' || c == '/') {
578 Ok(Some(s.name.split_at(sep + 1).1.to_string()))
579 } else {
580 Ok(Some(s.name.as_str().to_string()))
581 }
582 }
583 RemoteRepo::Local(ref l) => {
584 if let Some(file) = l.root.file_name() {
585 Ok(Some(
586 file.to_str()
587 .context("failed to decode local repository name")?
588 .to_string(),
589 ))
590 } else {
591 Ok(None)
592 }
593 }
594 RemoteRepo::Http(ref h) => {
595 if let Some(name) = libpijul::path::file_name(h.url.path()) {
596 if !name.trim().is_empty() {
597 return Ok(Some(name.trim().to_string()));
598 }
599 }
600 Ok(h.url.host().map(|h| h.to_string()))
601 }
602 RemoteRepo::LocalChannel(_) => Ok(None),
603 RemoteRepo::None => unreachable!(),
604 }
605 }
606
607 pub async fn finish(&mut self) -> Result<(), anyhow::Error> {
608 if let RemoteRepo::Ssh(s) = self {
609 s.finish().await?
610 }
611 Ok(())
612 }
613
614 pub async fn update_changelist<T: MutTxnTExt + TxnTExt + 'static>(
615 &mut self,
616 txn: &mut T,
617 path: &[String],
618 ) -> Result<Option<(HashSet<Position<Hash>>, RemoteRef<T>)>, anyhow::Error> {
619 debug!("update_changelist");
620 let id = if let Some(id) = self.get_id(txn).await? {
621 id
622 } else {
623 return Ok(None);
624 };
625 let mut remote = if let Some(name) = self.name() {
626 txn.open_or_create_remote(id, name)?
627 } else {
628 return Ok(None);
629 };
630 let n = self.dichotomy_changelist(txn, &remote.lock()).await?;
631 debug!("update changelist {:?}", n);
632 let v: Vec<_> = txn
633 .iter_remote(&remote.lock().remote, n)?
634 .filter_map(|k| {
635 debug!("filter_map {:?}", k);
636 let k = (*k.unwrap().0).into();
637 if k >= n { Some(k) } else { None }
638 })
639 .collect();
640 for k in v {
641 debug!("deleting {:?}", k);
642 txn.del_remote(&mut remote, k)?;
643 }
644 let v: Vec<_> = txn
645 .iter_tags(&remote.lock().tags, n)?
646 .filter_map(|k| {
647 debug!("filter_map {:?}", k);
648 let k = (*k.unwrap().0).into();
649 if k >= n { Some(k) } else { None }
650 })
651 .collect();
652 for k in v {
653 debug!("deleting {:?}", k);
654 txn.del_tags(&mut remote.lock().tags, k)?;
655 }
656
657 debug!("deleted");
658 let paths = self.download_changelist(txn, &mut remote, n, path).await?;
659 Ok(Some((paths, remote)))
660 }
661
662 async fn update_changelist_pushpull_from_scratch<T: RawMutTxnT>(
663 &mut self,
664 txn: &mut MutTxn<T>,
665 path: &[String],
666 current_channel: &ChannelRef<MutTxn<T>>,
667 ) -> Result<RemoteDelta<MutTxn<T>>, anyhow::Error> {
668 debug!("no id, starting from scratch");
669 let (inodes, theirs_ge_dichotomy) = self.download_changelist_nocache(0, path).await?;
670 let mut theirs_ge_dichotomy_set = HashSet::new();
671 let mut to_download = Vec::new();
672 for (_, h, m, is_tag) in theirs_ge_dichotomy.iter() {
673 theirs_ge_dichotomy_set.insert(CS::Change(*h));
674 if txn.get_revchanges(current_channel, h)?.is_none() {
675 to_download.push(CS::Change(*h));
676 }
677 if *is_tag {
678 let ch = current_channel.read();
679 if let Some(n) = txn.channel_has_state(txn.states(&*ch), &m.into())? {
680 if !txn.is_tagged(txn.tags(&*ch), n.into())? {
681 to_download.push(CS::State(*m));
682 }
683 } else {
684 to_download.push(CS::State(*m));
685 }
686 }
687 }
688 Ok(RemoteDelta {
689 inodes,
690 remote_ref: None,
691 to_download,
692 ours_ge_dichotomy_set: HashSet::new(),
693 theirs_ge_dichotomy,
694 theirs_ge_dichotomy_set,
695 remote_unrecs: Vec::new(),
696 })
697 }
698
699 pub async fn update_changelist_pushpull<T: RawMutTxnT + 'static>(
712 &mut self,
713 txn: &mut MutTxn<T>,
714 path: &[String],
715 current_channel: &ChannelRef<MutTxn<T>>,
716 force_cache: Option<bool>,
717 repo: &Repository,
718 specific_changes: &[String],
719 is_pull: bool,
720 ) -> Result<RemoteDelta<MutTxn<T>>, anyhow::Error> {
721 debug!("update_changelist_pushpull");
722 if let RemoteRepo::LocalChannel(c) = self {
723 return update_changelist_local_channel(
724 c,
725 txn,
726 path,
727 current_channel,
728 repo,
729 specific_changes,
730 );
731 }
732
733 let id = if let Some(id) = self.get_id(txn).await? {
734 debug!("id = {:?}", id);
735 id
736 } else {
737 return self
738 .update_changelist_pushpull_from_scratch(txn, path, current_channel)
739 .await;
740 };
741 let mut remote_ref = txn.open_or_create_remote(id, self.name().unwrap()).unwrap();
742 let dichotomy_n = self.dichotomy_changelist(txn, &remote_ref.lock()).await?;
743 let ours_ge_dichotomy: Vec<(u64, CS)> = txn
744 .iter_remote(&remote_ref.lock().remote, dichotomy_n)?
745 .filter_map(|k| {
746 debug!("filter_map {:?}", k);
747 match k.unwrap() {
748 (k, libpijul::pristine::Pair { a: hash, .. }) => {
749 let (k, hash) = (u64::from(*k), Hash::from(*hash));
750 if k >= dichotomy_n {
751 Some((k, CS::Change(hash)))
752 } else {
753 None
754 }
755 }
756 }
757 })
758 .collect();
759 let (inodes, theirs_ge_dichotomy) =
760 self.download_changelist_nocache(dichotomy_n, path).await?;
761 debug!("theirs_ge_dichotomy = {:?}", theirs_ge_dichotomy);
762 let ours_ge_dichotomy_set = ours_ge_dichotomy
763 .iter()
764 .map(|(_, h)| h)
765 .copied()
766 .collect::<HashSet<CS>>();
767 let mut theirs_ge_dichotomy_set = HashSet::new();
768 for (_, h, m, is_tag) in theirs_ge_dichotomy.iter() {
769 theirs_ge_dichotomy_set.insert(CS::Change(*h));
770 if *is_tag {
771 theirs_ge_dichotomy_set.insert(CS::State(*m));
772 }
773 }
774
775 let remote_unrecs = remote_unrecs(
777 txn,
778 current_channel,
779 &ours_ge_dichotomy,
780 &theirs_ge_dichotomy_set,
781 )?;
782 let should_cache = if let Some(true) = force_cache {
783 true
784 } else {
785 remote_unrecs.is_empty()
786 };
787 debug!(
788 "should_cache = {:?} {:?} {:?}",
789 force_cache, remote_unrecs, should_cache
790 );
791 if should_cache {
792 use libpijul::ChannelMutTxnT;
793 for (k, t) in ours_ge_dichotomy.iter().copied() {
794 match t {
795 CS::State(_) => txn.del_tags(&mut remote_ref.lock().tags, k)?,
796 CS::Change(_) => {
797 txn.del_remote(&mut remote_ref, k)?;
798 }
799 }
800 }
801 for (n, h, m, is_tag) in theirs_ge_dichotomy.iter().copied() {
802 debug!("theirs: {:?} {:?} {:?}", n, h, m);
803 txn.put_remote(&mut remote_ref, n, (h, m))?;
804 if is_tag {
805 txn.put_tags(&mut remote_ref.lock().tags, n, &m)?;
806 }
807 }
808 }
809 if !specific_changes.is_empty() {
810 let to_download = specific_changes
812 .iter()
813 .map(|h| {
814 if is_pull {
815 {
816 if let Ok(t) = txn.state_from_prefix(&remote_ref.lock().states, h) {
817 return Ok(CS::State(t.0));
818 }
819 }
820 Ok(CS::Change(txn.hash_from_prefix_remote(&remote_ref, h)?))
821 } else {
822 if let Ok(t) = txn.state_from_prefix(¤t_channel.read().states, h) {
823 Ok(CS::State(t.0))
824 } else {
825 Ok(CS::Change(txn.hash_from_prefix(h)?.0))
826 }
827 }
828 })
829 .collect::<Result<Vec<_>, anyhow::Error>>();
830 Ok(RemoteDelta {
831 inodes,
832 remote_ref: Some(remote_ref),
833 to_download: to_download?,
834 ours_ge_dichotomy_set,
835 theirs_ge_dichotomy,
836 theirs_ge_dichotomy_set,
837 remote_unrecs,
838 })
839 } else {
840 let mut to_download: Vec<CS> = Vec::new();
841 let mut to_download_ = HashSet::new();
842 for x in txn.iter_rev_remote(&remote_ref.lock().remote, None)? {
843 let (_, p) = x?;
844 let h: Hash = p.a.into();
845 if txn
846 .channel_has_state(txn.states(¤t_channel.read()), &p.b)
847 .unwrap()
848 .is_some()
849 {
850 break;
851 }
852 if txn.get_revchanges(¤t_channel, &h).unwrap().is_none() {
853 let h = CS::Change(h);
854 if to_download_.insert(h.clone()) {
855 to_download.push(h);
856 }
857 }
858 }
859
860 for (n, h, m, is_tag) in theirs_ge_dichotomy.iter() {
863 debug!(
864 "update_changelist_pushpull line {}, {:?} {:?}",
865 line!(),
866 n,
867 h
868 );
869 let ch = CS::Change(*h);
871 if txn.get_revchanges(¤t_channel, h).unwrap().is_none() {
872 if to_download_.insert(ch.clone()) {
873 to_download.push(ch.clone());
874 }
875 if *is_tag {
876 to_download.push(CS::State(*m));
877 }
878 } else if *is_tag {
879 let has_tag = if let Some(n) =
880 txn.channel_has_state(txn.states(¤t_channel.read()), &m.into())?
881 {
882 txn.is_tagged(txn.tags(¤t_channel.read()), n.into())?
883 } else {
884 false
885 };
886 if !has_tag {
887 to_download.push(CS::State(*m));
888 }
889 }
890 if should_cache && ours_ge_dichotomy_set.get(&ch).is_none() {
893 use libpijul::ChannelMutTxnT;
894 txn.put_remote(&mut remote_ref, *n, (*h, *m))?;
895 if *is_tag {
896 let mut rem = remote_ref.lock();
897 txn.put_tags(&mut rem.tags, *n, m)?;
898 }
899 }
900 }
901 Ok(RemoteDelta {
902 inodes,
903 remote_ref: Some(remote_ref),
904 to_download,
905 ours_ge_dichotomy_set,
906 theirs_ge_dichotomy,
907 theirs_ge_dichotomy_set,
908 remote_unrecs,
909 })
910 }
911 }
912
913 pub async fn download_changelist_nocache(
917 &mut self,
918 from: u64,
919 paths: &[String],
920 ) -> Result<(HashSet<Position<Hash>>, Vec<(u64, Hash, Merkle, bool)>), anyhow::Error> {
921 let mut v = Vec::new();
922 let f = |v: &mut Vec<(u64, Hash, Merkle, bool)>, n, h, m, m2| {
923 debug!("no cache: {:?}", h);
924 Ok(v.push((n, h, m, m2)))
925 };
926 let r = match *self {
927 RemoteRepo::Local(ref mut l) => l.download_changelist(f, &mut v, from, paths)?,
928 RemoteRepo::Ssh(ref mut s) => s.download_changelist(f, &mut v, from, paths).await?,
929 RemoteRepo::Http(ref h) => h.download_changelist(f, &mut v, from, paths).await?,
930 RemoteRepo::LocalChannel(_) => HashSet::new(),
931 RemoteRepo::None => unreachable!(),
932 };
933 Ok((r, v))
934 }
935
936 async fn dichotomy_changelist<T: MutTxnT + TxnTExt>(
940 &mut self,
941 txn: &T,
942 remote: &libpijul::pristine::Remote<T>,
943 ) -> Result<u64, anyhow::Error> {
944 let mut a = 0;
945 let (mut b, state): (_, Merkle) = if let Some((u, v)) = txn.last_remote(&remote.remote)? {
946 debug!("dichotomy_changelist: {:?} {:?}", u, v);
947 (u, (&v.b).into())
948 } else {
949 debug!("the local copy of the remote has no changes");
950 return Ok(0);
951 };
952 let last_statet = if let Some((_, _, v)) = txn.last_remote_tag(&remote.tags)? {
953 v.into()
954 } else {
955 Merkle::zero()
956 };
957 debug!("last_state: {:?} {:?}", state, last_statet);
958 if let Some((_, s, st)) = self.get_state(txn, Some(b)).await? {
959 debug!("remote last_state: {:?} {:?}", s, st);
960 if s == state && st == last_statet {
961 return Ok(b + 1);
963 }
964 }
965 while a < b {
969 let mid = (a + b) / 2;
970 let (mid, state) = {
971 let (a, b) = txn.get_remote_state(&remote.remote, mid)?.unwrap();
972 (a, b.b)
973 };
974 let statet = if let Some((_, b)) = txn.get_remote_tag(&remote.tags, mid)? {
975 b.b.into()
978 } else {
979 last_statet
982 };
983
984 let remote_state = self.get_state(txn, Some(mid)).await?;
985 debug!("dichotomy {:?} {:?} {:?}", mid, state, remote_state);
986 if let Some((_, remote_state, remote_statet)) = remote_state {
987 if remote_state == state && remote_statet == statet {
988 if a == mid {
989 return Ok(a + 1);
990 } else {
991 a = mid;
992 continue;
993 }
994 }
995 }
996 if b == mid {
997 break;
998 } else {
999 b = mid
1000 }
1001 }
1002 Ok(a)
1003 }
1004
1005 async fn get_state<T: libpijul::TxnTExt>(
1006 &mut self,
1007 txn: &T,
1008 mid: Option<u64>,
1009 ) -> Result<Option<(u64, Merkle, Merkle)>, anyhow::Error> {
1010 match *self {
1011 RemoteRepo::Local(ref mut l) => l.get_state(mid),
1012 RemoteRepo::Ssh(ref mut s) => s.get_state(mid).await,
1013 RemoteRepo::Http(ref mut h) => h.get_state(mid).await,
1014 RemoteRepo::LocalChannel(ref channel) => {
1015 if let Some(channel) = txn.load_channel(&channel)? {
1016 local::get_state(txn, &channel, mid)
1017 } else {
1018 Ok(None)
1019 }
1020 }
1021 RemoteRepo::None => unreachable!(),
1022 }
1023 }
1024
1025 async fn get_id<T: libpijul::TxnTExt + 'static>(
1029 &mut self,
1030 txn: &T,
1031 ) -> Result<Option<libpijul::pristine::RemoteId>, anyhow::Error> {
1032 match *self {
1033 RemoteRepo::Local(ref l) => Ok(Some(l.get_id()?)),
1034 RemoteRepo::Ssh(ref mut s) => s.get_id().await,
1035 RemoteRepo::Http(ref h) => h.get_id().await,
1036 RemoteRepo::LocalChannel(ref channel) => {
1037 if let Some(channel) = txn.load_channel(&channel)? {
1038 Ok(txn.id(&*channel.read()).cloned())
1039 } else {
1040 Err(anyhow::anyhow!(
1041 "Unable to retrieve RemoteId for LocalChannel remote"
1042 ))
1043 }
1044 }
1045 RemoteRepo::None => unreachable!(),
1046 }
1047 }
1048
1049 pub async fn archive<W: std::io::Write + Send + 'static>(
1050 &mut self,
1051 prefix: Option<String>,
1052 state: Option<(Merkle, &[Hash])>,
1053 umask: u16,
1054 w: W,
1055 ) -> Result<u64, anyhow::Error> {
1056 match *self {
1057 RemoteRepo::Local(ref mut l) => {
1058 debug!("archiving local repo");
1059 let changes = libpijul::changestore::filesystem::FileSystem::from_root(
1060 &l.root,
1061 pijul_repository::max_files(),
1062 );
1063 let working_copy =
1064 libpijul::working_copy::filesystem::FileSystem::from_root(&l.root);
1065 let mut tarball = libpijul::output::Tarball::new(w, prefix, umask);
1066 let conflicts = if let Some((state, extra)) = state {
1067 let txn = l.pristine.arc_txn_begin()?;
1068 let channel = {
1069 let txn = txn.read();
1070 txn.load_channel(&l.channel)?.unwrap()
1071 };
1072 txn.archive_with_state(
1073 &changes,
1074 &channel,
1075 &state,
1076 extra,
1077 &mut tarball,
1078 0,
1079 &working_copy,
1080 )?
1081 } else {
1082 let txn = l.pristine.arc_txn_begin()?;
1083 let channel = {
1084 let txn = txn.read();
1085 txn.load_channel(&l.channel)?.unwrap()
1086 };
1087 txn.archive::<_, _, libpijul::working_copy::filesystem::FileSystem>(
1088 &changes,
1089 &channel,
1090 &mut tarball,
1091 )?
1092 };
1093 Ok(conflicts.len() as u64)
1094 }
1095 RemoteRepo::Ssh(ref mut s) => s.archive(prefix, state, w).await,
1096 RemoteRepo::Http(ref mut h) => h.archive(prefix, state, w).await,
1097 RemoteRepo::LocalChannel(_) => unreachable!(),
1098 RemoteRepo::None => unreachable!(),
1099 }
1100 }
1101
1102 async fn download_changelist<T: MutTxnTExt>(
1103 &mut self,
1104 txn: &mut T,
1105 remote: &mut RemoteRef<T>,
1106 from: u64,
1107 paths: &[String],
1108 ) -> Result<HashSet<Position<Hash>>, anyhow::Error> {
1109 let f = |a: &mut (&mut T, &mut RemoteRef<T>), n, h, m, is_tag| {
1110 let (ref mut txn, ref mut remote) = *a;
1111 txn.put_remote(remote, n, (h, m))?;
1112 if is_tag {
1113 txn.put_tags(&mut remote.lock().tags, n, &m.into())?;
1114 }
1115 Ok(())
1116 };
1117 match *self {
1118 RemoteRepo::Local(ref mut l) => {
1119 l.download_changelist(f, &mut (txn, remote), from, paths)
1120 }
1121 RemoteRepo::Ssh(ref mut s) => {
1122 s.download_changelist(f, &mut (txn, remote), from, paths)
1123 .await
1124 }
1125 RemoteRepo::Http(ref h) => {
1126 h.download_changelist(f, &mut (txn, remote), from, paths)
1127 .await
1128 }
1129 RemoteRepo::LocalChannel(_) => Ok(HashSet::new()),
1130 RemoteRepo::None => unreachable!(),
1131 }
1132 }
1133
1134 pub async fn upload_changes<T: MutTxnTExt + 'static>(
1135 &mut self,
1136 txn: &mut T,
1137 local: PathBuf,
1138 to_channel: Option<&str>,
1139 changes: &[CS],
1140 ) -> Result<(), anyhow::Error> {
1141 let upload_bar = ProgressBar::new(changes.len() as u64, UPLOAD_MESSAGE)?;
1142
1143 match self {
1144 RemoteRepo::Local(l) => l.upload_changes(upload_bar, local, to_channel, changes)?,
1145 RemoteRepo::Ssh(s) => {
1146 s.upload_changes(upload_bar, local, to_channel, changes)
1147 .await?
1148 }
1149 &mut RemoteRepo::Http(ref h) => {
1150 h.upload_changes(upload_bar, local, to_channel, changes)
1151 .await?
1152 }
1153 &mut RemoteRepo::LocalChannel(ref channel) => {
1154 let mut channel = txn.open_or_create_channel(channel)?;
1155 let store = libpijul::changestore::filesystem::FileSystem::from_changes(
1156 local,
1157 pijul_repository::max_files(),
1158 );
1159 local::upload_changes(upload_bar, &store, txn, &mut channel, changes)?
1160 }
1161 RemoteRepo::None => unreachable!(),
1162 }
1163 Ok(())
1164 }
1165
1166 pub async fn download_changes(
1168 &mut self,
1169 progress_bar: ProgressBar,
1170 hashes: &mut tokio::sync::mpsc::UnboundedReceiver<CS>,
1171 send: &mut tokio::sync::mpsc::Sender<(CS, bool)>,
1172 path: &mut PathBuf,
1173 full: bool,
1174 ) -> Result<bool, anyhow::Error> {
1175 debug!("download_changes");
1176 match *self {
1177 RemoteRepo::Local(ref mut l) => {
1178 l.download_changes(progress_bar, hashes, send, path).await?
1179 }
1180 RemoteRepo::Ssh(ref mut s) => {
1181 s.download_changes(progress_bar, hashes, send, path, full)
1182 .await?
1183 }
1184 RemoteRepo::Http(ref mut h) => {
1185 h.download_changes(progress_bar, hashes, send, path, full)
1186 .await?
1187 }
1188 RemoteRepo::LocalChannel(_) => {
1189 while let Some(c) = hashes.recv().await {
1190 send.send((c, true)).await?;
1191 }
1192 }
1193 RemoteRepo::None => unreachable!(),
1194 }
1195 Ok(true)
1196 }
1197
1198 pub async fn update_identities<T: MutTxnTExt + TxnTExt + GraphIter>(
1199 &mut self,
1200 repo: &mut Repository,
1201 remote: &RemoteRef<T>,
1202 ) -> Result<(), anyhow::Error> {
1203 debug!("Downloading identities");
1204 let mut id_path = repo.path.clone();
1205 id_path.push(DOT_DIR);
1206 id_path.push("identities");
1207 let rev = None;
1208 let r = match *self {
1209 RemoteRepo::Local(ref mut l) => l.update_identities(rev, id_path).await?,
1210 RemoteRepo::Ssh(ref mut s) => s.update_identities(rev, id_path).await?,
1211 RemoteRepo::Http(ref mut h) => h.update_identities(rev, id_path).await?,
1212 RemoteRepo::LocalChannel(_) => 0,
1213 RemoteRepo::None => unreachable!(),
1214 };
1215 remote.set_id_revision(r);
1216 Ok(())
1217 }
1218
1219 pub async fn prove(&mut self, key: libpijul::key::SKey) -> Result<(), anyhow::Error> {
1220 match *self {
1221 RemoteRepo::Ssh(ref mut s) => s.prove(key).await,
1222 RemoteRepo::Http(ref mut h) => h.prove(key).await,
1223 RemoteRepo::None => unreachable!(),
1224 _ => Ok(()),
1225 }
1226 }
1227
1228 pub async fn pull<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
1229 &mut self,
1230 repo: &mut Repository,
1231 txn: &mut T,
1232 channel: &mut ChannelRef<T>,
1233 to_apply: &[CS],
1234 inodes: &HashSet<Position<Hash>>,
1235 do_apply: bool,
1236 ) -> Result<Vec<CS>, anyhow::Error> {
1237 let apply_len = to_apply.len() as u64;
1238 let download_bar = ProgressBar::new(apply_len, DOWNLOAD_MESSAGE)?;
1239 let apply_bar = if do_apply {
1240 Some(ProgressBar::new(apply_len, APPLY_MESSAGE)?)
1241 } else {
1242 None
1243 };
1244
1245 let (mut send, recv) = tokio::sync::mpsc::channel(100);
1246
1247 let mut self_ = std::mem::replace(self, RemoteRepo::None);
1248 let (hash_send, mut hash_recv) = tokio::sync::mpsc::unbounded_channel();
1249 let mut change_path_ = repo.path.clone();
1250 change_path_.push(DOT_DIR);
1251 change_path_.push("changes");
1252 let cloned_download_bar = download_bar.clone();
1253 let t = tokio::spawn(async move {
1254 self_
1255 .download_changes(
1256 cloned_download_bar,
1257 &mut hash_recv,
1258 &mut send,
1259 &mut change_path_,
1260 false,
1261 )
1262 .await?;
1263
1264 Ok::<_, anyhow::Error>(self_)
1265 });
1266
1267 let mut change_path_ = repo.changes_dir.clone();
1268 let mut waiting = 0;
1269 let (send_ready, mut recv_ready) = tokio::sync::mpsc::channel(100);
1270
1271 let mut asked = HashSet::new();
1272 for h in to_apply {
1273 debug!("to_apply {:?}", h);
1274 match h {
1275 CS::Change(h) => {
1276 libpijul::changestore::filesystem::push_filename(&mut change_path_, h);
1277 }
1278 CS::State(h) => {
1279 libpijul::changestore::filesystem::push_tag_filename(&mut change_path_, h);
1280 }
1281 }
1282 asked.insert(*h);
1283 hash_send.send(*h)?;
1284 waiting += 1;
1285 libpijul::changestore::filesystem::pop_filename(&mut change_path_);
1286 }
1287
1288 let u = self
1289 .download_changes_rec(
1290 repo,
1291 hash_send,
1292 recv,
1293 send_ready,
1294 download_bar,
1295 waiting,
1296 asked,
1297 )
1298 .await?;
1299
1300 let mut ws = libpijul::ApplyWorkspace::new();
1301 let mut to_apply_inodes = HashSet::new();
1302 while let Some(h) = recv_ready.recv().await {
1303 debug!("to_apply: {:?}", h);
1304 let touches_inodes = inodes.is_empty()
1305 || {
1306 debug!("inodes = {:?}", inodes);
1307 use libpijul::changestore::ChangeStore;
1308 if let CS::Change(ref h) = h {
1309 let changes = repo.changes.get_changes(h)?;
1310 changes.iter().any(|c| {
1311 c.iter().any(|c| {
1312 let inode = c.inode();
1313 debug!("inode = {:?}", inode);
1314 inodes.contains(&Position {
1315 change: inode.change.unwrap_or(*h),
1316 pos: inode.pos,
1317 })
1318 })
1319 })
1320 } else {
1321 false
1322 }
1323 }
1324 || { inodes.iter().any(|i| CS::Change(i.change) == h) };
1325
1326 if touches_inodes {
1327 to_apply_inodes.insert(h);
1328 } else {
1329 continue;
1330 }
1331
1332 if let Some(apply_bar) = apply_bar.clone() {
1333 info!("Applying {:?}", h);
1334 apply_bar.inc(1);
1335 debug!("apply");
1336 if let CS::Change(h) = h {
1337 let mut channel = channel.write();
1338 txn.apply_change_rec_ws(&repo.changes, &mut channel, &h, &mut ws)?;
1339 }
1340 debug!("applied");
1341 } else {
1342 debug!("not applying {:?}", h)
1343 }
1344 }
1345
1346 let mut result = Vec::with_capacity(to_apply_inodes.len());
1347 for h in to_apply {
1348 if to_apply_inodes.contains(&h) {
1349 result.push(*h)
1350 }
1351 }
1352
1353 debug!("finished");
1354 debug!("waiting for spawned process");
1355 *self = t.await??;
1356 u.await??;
1357 Ok(result)
1358 }
1359
1360 async fn download_changes_rec(
1361 &mut self,
1362 repo: &mut Repository,
1363 send_hash: tokio::sync::mpsc::UnboundedSender<CS>,
1364 mut recv_signal: tokio::sync::mpsc::Receiver<(CS, bool)>,
1365 send_ready: tokio::sync::mpsc::Sender<CS>,
1366 progress_bar: ProgressBar,
1367 mut waiting: usize,
1368 mut asked: HashSet<CS>,
1369 ) -> Result<tokio::task::JoinHandle<Result<(), anyhow::Error>>, anyhow::Error> {
1370 let mut dep_path = repo.changes_dir.clone();
1371 let changes = repo.changes.clone();
1372 let t = tokio::spawn(async move {
1373 if waiting == 0 {
1374 return Ok(());
1375 }
1376 let mut ready = Vec::new();
1377 while let Some((hash, follow)) = recv_signal.recv().await {
1378 debug!("received {:?} {:?}", hash, follow);
1379 if let CS::Change(hash) = hash {
1380 waiting -= 1;
1381 if follow {
1382 use libpijul::changestore::ChangeStore;
1383 let mut needs_dep = false;
1384 for dep in changes.get_dependencies(&hash)? {
1385 let dep: libpijul::pristine::Hash = dep;
1386
1387 libpijul::changestore::filesystem::push_filename(&mut dep_path, &dep);
1388 let has_dep = std::fs::metadata(&dep_path).is_ok();
1389 libpijul::changestore::filesystem::pop_filename(&mut dep_path);
1390
1391 if !has_dep {
1392 needs_dep = true;
1393 if asked.insert(CS::Change(dep)) {
1394 progress_bar.inc(1);
1395 send_hash.send(CS::Change(dep))?;
1396 waiting += 1
1397 }
1398 }
1399 }
1400
1401 if !needs_dep {
1402 send_ready.send(CS::Change(hash)).await?;
1403 } else {
1404 ready.push(CS::Change(hash))
1405 }
1406 } else {
1407 send_ready.send(CS::Change(hash)).await?;
1408 }
1409 }
1410 if waiting == 0 {
1411 break;
1412 }
1413 }
1414 info!("waiting loop done");
1415 for r in ready {
1416 send_ready.send(r).await?;
1417 }
1418 std::mem::drop(recv_signal);
1419 Ok(())
1420 });
1421 Ok(t)
1422 }
1423
1424 pub async fn clone_tag<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
1425 &mut self,
1426 repo: &mut Repository,
1427 txn: &mut T,
1428 channel: &mut ChannelRef<T>,
1429 tag: &[Hash],
1430 ) -> Result<(), anyhow::Error> {
1431 let (send_hash, mut recv_hash) = tokio::sync::mpsc::unbounded_channel();
1432 let (mut send_signal, recv_signal) = tokio::sync::mpsc::channel(100);
1433 let mut self_ = std::mem::replace(self, RemoteRepo::None);
1434 let mut change_path_ = repo.changes_dir.clone();
1435 let download_bar = ProgressBar::new(tag.len() as u64, DOWNLOAD_MESSAGE)?;
1436 let cloned_download_bar = download_bar.clone();
1437
1438 let t = tokio::spawn(async move {
1439 self_
1440 .download_changes(
1441 cloned_download_bar,
1442 &mut recv_hash,
1443 &mut send_signal,
1444 &mut change_path_,
1445 false,
1446 )
1447 .await?;
1448 Ok(self_)
1449 });
1450
1451 let mut waiting = 0;
1452 let mut asked = HashSet::new();
1453 for &h in tag.iter() {
1454 waiting += 1;
1455 send_hash.send(CS::Change(h))?;
1456 asked.insert(CS::Change(h));
1457 }
1458
1459 let (send_ready, mut recv_ready) = tokio::sync::mpsc::channel(100);
1460
1461 let u = self
1462 .download_changes_rec(
1463 repo,
1464 send_hash,
1465 recv_signal,
1466 send_ready,
1467 download_bar,
1468 waiting,
1469 asked,
1470 )
1471 .await?;
1472
1473 let mut hashes = Vec::new();
1474 let mut ws = libpijul::ApplyWorkspace::new();
1475 {
1476 let mut channel_ = channel.write();
1477 while let Some(hash) = recv_ready.recv().await {
1478 if let CS::Change(ref hash) = hash {
1479 txn.apply_change_rec_ws(&repo.changes, &mut channel_, hash, &mut ws)?;
1480 }
1481 hashes.push(hash);
1482 }
1483 }
1484 let r: Result<_, anyhow::Error> = t.await?;
1485 *self = r?;
1486 u.await??;
1487 self.complete_changes(repo, txn, channel, &hashes, false)
1488 .await?;
1489 Ok(())
1490 }
1491
1492 pub async fn clone_state<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
1493 &mut self,
1494 repo: &mut Repository,
1495 txn: &mut T,
1496 channel: &mut ChannelRef<T>,
1497 state: Merkle,
1498 ) -> Result<(), anyhow::Error> {
1499 let id = if let Some(id) = self.get_id(txn).await? {
1500 id
1501 } else {
1502 return Ok(());
1503 };
1504 self.update_changelist(txn, &[]).await?;
1505 let remote = txn.open_or_create_remote(id, self.name().unwrap()).unwrap();
1506 let mut to_pull = Vec::new();
1507 let mut found = false;
1508 for x in txn.iter_remote(&remote.lock().remote, 0)? {
1509 let (n, p) = x?;
1510 debug!("{:?} {:?}", n, p);
1511 to_pull.push(CS::Change(p.a.into()));
1512 if p.b == state {
1513 found = true;
1514 break;
1515 }
1516 }
1517 if !found {
1518 bail!("State not found: {:?}", state)
1519 }
1520 self.pull(repo, txn, channel, &to_pull, &HashSet::new(), true)
1521 .await?;
1522 self.update_identities(repo, &remote).await?;
1523
1524 self.complete_changes(repo, txn, channel, &to_pull, false)
1525 .await?;
1526 Ok(())
1527 }
1528
1529 pub async fn complete_changes<T: MutTxnT + TxnTExt + GraphIter>(
1530 &mut self,
1531 repo: &pijul_repository::Repository,
1532 txn: &T,
1533 local_channel: &mut ChannelRef<T>,
1534 changes: &[CS],
1535 full: bool,
1536 ) -> Result<(), anyhow::Error> {
1537 debug!("complete changes {:?}", changes);
1538 use libpijul::changestore::ChangeStore;
1539 let (send_hash, mut recv_hash) = tokio::sync::mpsc::unbounded_channel();
1540 let (mut send_sig, mut recv_sig) = tokio::sync::mpsc::channel(100);
1541 let mut self_ = std::mem::replace(self, RemoteRepo::None);
1542 let mut changes_dir = repo.changes_dir.clone();
1543
1544 let download_bar = ProgressBar::new(changes.len() as u64, DOWNLOAD_MESSAGE)?;
1545 let _completion_spinner = Spinner::new(COMPLETE_MESSAGE)?;
1546 let t: tokio::task::JoinHandle<Result<RemoteRepo, anyhow::Error>> =
1547 tokio::spawn(async move {
1548 self_
1549 .download_changes(
1550 download_bar,
1551 &mut recv_hash,
1552 &mut send_sig,
1553 &mut changes_dir,
1554 true,
1555 )
1556 .await?;
1557 Ok::<_, anyhow::Error>(self_)
1558 });
1559
1560 for c in changes {
1561 let c = if let CS::Change(c) = c { c } else { continue };
1562 let sc = c.into();
1563 if repo
1564 .changes
1565 .has_contents(*c, txn.get_internal(&sc)?.cloned())
1566 {
1567 debug!("has contents {:?}", c);
1568 continue;
1569 }
1570 if full {
1571 debug!("sending send_hash");
1572 send_hash.send(CS::Change(*c))?;
1573 debug!("sent");
1574 continue;
1575 }
1576 let change = if let Some(&i) = txn.get_internal(&sc)? {
1577 i
1578 } else {
1579 debug!("could not find internal for {:?}", sc);
1580 continue;
1581 };
1582 let v = libpijul::pristine::Vertex {
1584 change,
1585 start: libpijul::pristine::ChangePosition(0u64.into()),
1586 end: libpijul::pristine::ChangePosition(0u64.into()),
1587 };
1588 let channel = local_channel.read();
1589 let graph = txn.graph(&channel);
1590 for x in txn.iter_graph(graph, Some(&v))? {
1591 let (v, e) = x?;
1592 if v.change > change {
1593 break;
1594 } else if e.flag().is_alive_parent() {
1595 send_hash.send(CS::Change(*c))?;
1596 break;
1597 }
1598 }
1599 }
1600 debug!("dropping send_hash");
1601 std::mem::drop(send_hash);
1602 while recv_sig.recv().await.is_some() {}
1603 *self = t.await??;
1604 Ok(())
1605 }
1606
1607 pub async fn clone_channel<T: MutTxnTExt + TxnTExt + GraphIter + 'static>(
1608 &mut self,
1609 repo: &mut Repository,
1610 txn: &mut T,
1611 local_channel: &mut ChannelRef<T>,
1612 path: &[String],
1613 ) -> Result<(), anyhow::Error> {
1614 let (inodes, remote_changes) = if let Some(x) = self.update_changelist(txn, path).await? {
1615 x
1616 } else {
1617 bail!("Channel not found")
1618 };
1619 let mut pullable = Vec::new();
1620 {
1621 let rem = remote_changes.lock();
1622 for x in txn.iter_remote(&rem.remote, 0)? {
1623 let (_, p) = x?;
1624 pullable.push(CS::Change(p.a.into()))
1625 }
1626 }
1627 self.pull(repo, txn, local_channel, &pullable, &inodes, true)
1628 .await?;
1629 self.update_identities(repo, &remote_changes).await?;
1630
1631 self.complete_changes(repo, txn, local_channel, &pullable, false)
1632 .await?;
1633 Ok(())
1634 }
1635}
1636
1637static CHANGELIST_LINE: LazyLock<Regex> = LazyLock::new(|| {
1638 Regex::new(r#"(?P<num>[0-9]+)\.(?P<hash>[A-Za-z0-9]+)\.(?P<merkle>[A-Za-z0-9]+)(?P<tag>\.)?"#)
1639 .unwrap()
1640});
1641static PATHS_LINE: LazyLock<Regex> =
1642 LazyLock::new(|| Regex::new(r#"(?P<hash>[A-Za-z0-9]+)\.(?P<num>[0-9]+)"#).unwrap());
1643
1644enum ListLine {
1645 Change {
1646 n: u64,
1647 h: Hash,
1648 m: Merkle,
1649 tag: bool,
1650 },
1651 Position(Position<Hash>),
1652 Error(String),
1653}
1654
1655fn parse_line(data: &str) -> Result<ListLine, anyhow::Error> {
1656 debug!("data = {:?}", data);
1657 if let Some(caps) = CHANGELIST_LINE.captures(data) {
1658 if let (Some(h), Some(m)) = (
1659 Hash::from_base32(caps.name("hash").unwrap().as_str().as_bytes()),
1660 Merkle::from_base32(caps.name("merkle").unwrap().as_str().as_bytes()),
1661 ) {
1662 return Ok(ListLine::Change {
1663 n: caps.name("num").unwrap().as_str().parse().unwrap(),
1664 h,
1665 m,
1666 tag: caps.name("tag").is_some(),
1667 });
1668 }
1669 }
1670 if data.starts_with("error:") {
1671 return Ok(ListLine::Error(data.split_at(6).1.to_string()));
1672 }
1673 if let Some(caps) = PATHS_LINE.captures(data) {
1674 return Ok(ListLine::Position(Position {
1675 change: Hash::from_base32(caps.name("hash").unwrap().as_str().as_bytes()).unwrap(),
1676 pos: ChangePosition(
1677 caps.name("num")
1678 .unwrap()
1679 .as_str()
1680 .parse::<u64>()
1681 .unwrap()
1682 .into(),
1683 ),
1684 }));
1685 }
1686 debug!("offending line: {:?}", data);
1687 bail!("Protocol error")
1688}
1689
1690fn remote_unrecs<T: TxnTExt + ChannelTxnT>(
1694 txn: &T,
1695 current_channel: &ChannelRef<T>,
1696 ours_ge_dichotomy: &[(u64, CS)],
1697 theirs_ge_dichotomy_set: &HashSet<CS>,
1698) -> Result<Vec<(u64, CS)>, anyhow::Error> {
1699 let mut remote_unrecs = Vec::new();
1700 for (n, hash) in ours_ge_dichotomy {
1701 debug!("ours_ge_dichotomy: {:?} {:?}", n, hash);
1702 if theirs_ge_dichotomy_set.contains(hash) {
1703 debug!("still present");
1705 continue;
1706 } else {
1707 let has_it = match hash {
1708 CS::Change(hash) => txn.get_revchanges(¤t_channel, &hash)?.is_some(),
1709 CS::State(state) => {
1710 let ch = current_channel.read();
1711 if let Some(n) = txn.channel_has_state(txn.states(&*ch), &state.into())? {
1712 txn.is_tagged(txn.tags(&*ch), n.into())?
1713 } else {
1714 false
1715 }
1716 }
1717 };
1718 if has_it {
1719 remote_unrecs.push((*n, *hash))
1720 } else {
1721 continue;
1723 }
1724 }
1725 }
1726 Ok(remote_unrecs)
1727}