pijul_remote/
local.rs

1use std::collections::HashSet;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::bail;
6use libpijul::pristine::{Hash, Merkle, MutTxnT, Position, TxnT};
7use libpijul::*;
8use log::debug;
9
10use crate::CS;
11use pijul_interaction::ProgressBar;
12
13#[derive(Clone)]
14pub struct Local {
15    pub channel: String,
16    pub root: std::path::PathBuf,
17    pub changes_dir: std::path::PathBuf,
18    pub pristine: Arc<libpijul::pristine::sanakirja::Pristine>,
19    pub name: String,
20}
21
22pub fn get_state<T: TxnTExt>(
23    txn: &T,
24    channel: &libpijul::pristine::ChannelRef<T>,
25    mid: Option<u64>,
26) -> Result<Option<(u64, Merkle, Merkle)>, anyhow::Error> {
27    if let Some(x) = txn.reverse_log(&*channel.read(), mid)?.next() {
28        let (n, (_, m)) = x?;
29        if let Some(m2) = txn
30            .rev_iter_tags(txn.tags(&*channel.read()), Some(n.into()))?
31            .next()
32        {
33            let (_, m2) = m2?;
34            Ok(Some((n, m.into(), m2.b.into())))
35        } else {
36            Ok(Some((n, m.into(), Merkle::zero())))
37        }
38    } else {
39        Ok(None)
40    }
41}
42
43impl Local {
44    pub fn get_state(
45        &mut self,
46        mid: Option<u64>,
47    ) -> Result<Option<(u64, Merkle, Merkle)>, anyhow::Error> {
48        let txn = self.pristine.txn_begin()?;
49        let channel = txn.load_channel(&self.channel)?.unwrap();
50        Ok(get_state(&txn, &channel, mid)?)
51    }
52
53    pub fn get_id(&self) -> Result<libpijul::pristine::RemoteId, anyhow::Error> {
54        let txn = self.pristine.txn_begin()?;
55        if let Some(channel) = txn.load_channel(&self.channel)? {
56            Ok(*txn.id(&*channel.read()).unwrap())
57        } else {
58            Err(anyhow::anyhow!(
59                "Channel {} does not exist in repository {}",
60                self.channel,
61                self.name
62            ))
63        }
64    }
65
66    pub fn download_changelist<
67        A,
68        F: FnMut(&mut A, u64, Hash, Merkle, bool) -> Result<(), anyhow::Error>,
69    >(
70        &mut self,
71        mut f: F,
72        a: &mut A,
73        from: u64,
74        paths: &[String],
75    ) -> Result<HashSet<Position<Hash>>, anyhow::Error> {
76        let store = libpijul::changestore::filesystem::FileSystem::from_root(
77            &self.root,
78            pijul_repository::max_files()?,
79        );
80        let remote_txn = self.pristine.txn_begin()?;
81        let remote_channel = if let Some(channel) = remote_txn.load_channel(&self.channel)? {
82            channel
83        } else {
84            debug!(
85                "Local::download_changelist found no channel named {:?}",
86                self.channel
87            );
88            bail!("No channel {} found for remote {}", self.name, self.channel)
89        };
90        let mut paths_ = HashSet::new();
91        let mut result = HashSet::new();
92        for s in paths {
93            if let Ok((p, _ambiguous)) = remote_txn.follow_oldest_path(&store, &remote_channel, s) {
94                debug!("p = {:?}", p);
95                result.insert(Position {
96                    change: remote_txn.get_external(&p.change)?.unwrap().into(),
97                    pos: p.pos,
98                });
99                paths_.insert(p);
100                paths_.extend(
101                    libpijul::fs::iter_graph_descendants(&remote_txn, &remote_channel.read(), p)?
102                        .map(|x| x.unwrap()),
103                );
104            }
105        }
106        debug!("paths_ = {:?}", paths_);
107        debug!("from = {:?}", from);
108
109        let rem = remote_channel.read();
110        let tags: Vec<u64> = remote_txn
111            .iter_tags(remote_txn.tags(&*rem), from)?
112            .map(|k| (*k.unwrap().0).into())
113            .collect();
114        let mut tagsi = 0;
115
116        for x in remote_txn.log(&*rem, from)? {
117            let (n, (h, m)) = x?;
118            assert!(n >= from);
119            let h_int = remote_txn.get_internal(h)?.unwrap();
120            if paths_.is_empty()
121                || paths_.iter().any(|x| {
122                    remote_txn
123                        .get_touched_files(x, Some(h_int))
124                        .unwrap()
125                        .is_some()
126                })
127            {
128                debug!("put_remote {:?} {:?} {:?}", n, h, m);
129                if tags.get(tagsi) == Some(&n) {
130                    f(a, n, h.into(), m.into(), true)?;
131                    tagsi += 1;
132                } else {
133                    f(a, n, h.into(), m.into(), false)?;
134                }
135            }
136        }
137        Ok(result)
138    }
139
140    pub fn upload_changes(
141        &mut self,
142        progress_bar: ProgressBar,
143        mut local: PathBuf,
144        to_channel: Option<&str>,
145        changes: &[CS],
146    ) -> Result<(), anyhow::Error> {
147        let store = libpijul::changestore::filesystem::FileSystem::from_root(
148            &self.root,
149            pijul_repository::max_files()?,
150        );
151        let txn = self.pristine.arc_txn_begin()?;
152        let channel = txn
153            .write()
154            .open_or_create_channel(to_channel.unwrap_or(&self.channel))?;
155        for c in changes {
156            match c {
157                CS::Change(c) => {
158                    libpijul::changestore::filesystem::push_filename(&mut local, &c);
159                    libpijul::changestore::filesystem::push_filename(&mut self.changes_dir, &c);
160                }
161                CS::State(c) => {
162                    libpijul::changestore::filesystem::push_tag_filename(&mut local, &c);
163                    libpijul::changestore::filesystem::push_tag_filename(&mut self.changes_dir, &c);
164                }
165            }
166            std::fs::create_dir_all(&self.changes_dir.parent().unwrap())?;
167            debug!("hard link {:?} {:?}", local, self.changes_dir);
168            if std::fs::metadata(&self.changes_dir).is_err() {
169                if std::fs::hard_link(&local, &self.changes_dir).is_err() {
170                    std::fs::copy(&local, &self.changes_dir)?;
171                }
172            }
173            debug!("hard link done");
174            libpijul::changestore::filesystem::pop_filename(&mut local);
175            libpijul::changestore::filesystem::pop_filename(&mut self.changes_dir);
176        }
177        let repo = libpijul::working_copy::filesystem::FileSystem::from_root(&self.root);
178        upload_changes(progress_bar, &store, &mut *txn.write(), &channel, changes)?;
179        libpijul::output::output_repository_no_pending(
180            &repo,
181            &store,
182            &txn,
183            &channel,
184            "",
185            true,
186            None,
187            std::thread::available_parallelism()?.get(),
188            0,
189        )?;
190        txn.commit()?;
191        Ok(())
192    }
193
194    pub async fn download_changes(
195        &mut self,
196        progress_bar: ProgressBar,
197        hashes: &mut tokio::sync::mpsc::UnboundedReceiver<CS>,
198        send: &mut tokio::sync::mpsc::Sender<(CS, bool)>,
199        mut path: &mut PathBuf,
200    ) -> Result<(), anyhow::Error> {
201        while let Some(c) = hashes.recv().await {
202            match c {
203                CS::Change(c) => {
204                    libpijul::changestore::filesystem::push_filename(&mut self.changes_dir, &c);
205                    libpijul::changestore::filesystem::push_filename(&mut path, &c);
206                }
207                CS::State(c) => {
208                    libpijul::changestore::filesystem::push_tag_filename(&mut self.changes_dir, &c);
209                    libpijul::changestore::filesystem::push_tag_filename(&mut path, &c);
210                }
211            }
212            progress_bar.inc(1);
213
214            if std::fs::metadata(&path).is_ok() {
215                debug!("metadata {:?} ok", path);
216                libpijul::changestore::filesystem::pop_filename(&mut path);
217                send.send((c, false)).await?;
218                continue;
219            }
220            std::fs::create_dir_all(&path.parent().unwrap())?;
221            if std::fs::hard_link(&self.changes_dir, &path).is_err() {
222                std::fs::copy(&self.changes_dir, &path)?;
223            }
224            libpijul::changestore::filesystem::pop_filename(&mut self.changes_dir);
225            libpijul::changestore::filesystem::pop_filename(&mut path);
226            send.send((c, true)).await?;
227        }
228        Ok(())
229    }
230
231    pub async fn update_identities(
232        &mut self,
233        _rev: Option<u64>,
234        mut path: PathBuf,
235    ) -> Result<u64, anyhow::Error> {
236        let mut other_path = self.root.join(DOT_DIR);
237        other_path.push("identities");
238        let r = if let Ok(r) = std::fs::read_dir(&other_path) {
239            r
240        } else {
241            return Ok(0);
242        };
243        std::fs::create_dir_all(&path)?;
244        for id in r {
245            let id = id?;
246            let m = id.metadata()?;
247            let p = id.path();
248            path.push(p.file_name().unwrap());
249            if let Ok(ml) = std::fs::metadata(&path) {
250                if ml.modified()? < m.modified()? {
251                    std::fs::remove_file(&path)?;
252                } else {
253                    path.pop();
254                    continue;
255                }
256            }
257            if std::fs::hard_link(&p, &path).is_err() {
258                std::fs::copy(&p, &path)?;
259            }
260            debug!("hard link done");
261            path.pop();
262        }
263        Ok(0)
264    }
265}
266
267pub fn upload_changes<T: MutTxnTExt + 'static, C: libpijul::changestore::ChangeStore>(
268    progress_bar: ProgressBar,
269    store: &C,
270    txn: &mut T,
271    channel: &libpijul::pristine::ChannelRef<T>,
272    changes: &[CS],
273) -> Result<(), anyhow::Error> {
274    let mut ws = libpijul::ApplyWorkspace::new();
275    let mut channel = channel.write();
276    for c in changes {
277        match c {
278            CS::Change(c) => {
279                txn.apply_change_ws(store, &mut *channel, c, &mut ws)?;
280            }
281            CS::State(c) => {
282                if let Some(n) = txn.channel_has_state(txn.states(&*channel), &c.into())? {
283                    let tags = txn.tags_mut(&mut *channel);
284                    txn.put_tags(tags, n.into(), c)?;
285                } else {
286                    bail!(
287                        "Cannot add tag {}: channel {:?} does not have that state",
288                        c.to_base32(),
289                        txn.name(&*channel)
290                    )
291                }
292            }
293        }
294        progress_bar.inc(1);
295    }
296    Ok(())
297}