1use std::collections::HashSet;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::bail;
6use libpijul::pristine::{Hash, Merkle, MutTxnT, Position, TxnT};
7use libpijul::*;
8use log::debug;
9
10use crate::CS;
11use pijul_interaction::ProgressBar;
12
13#[derive(Clone)]
14pub struct Local {
15 pub channel: String,
16 pub root: std::path::PathBuf,
17 pub changes_dir: std::path::PathBuf,
18 pub pristine: Arc<libpijul::pristine::sanakirja::Pristine>,
19 pub name: String,
20}
21
22pub fn get_state<T: TxnTExt>(
23 txn: &T,
24 channel: &libpijul::pristine::ChannelRef<T>,
25 mid: Option<u64>,
26) -> Result<Option<(u64, Merkle, Merkle)>, anyhow::Error> {
27 if let Some(x) = txn.reverse_log(&*channel.read(), mid)?.next() {
28 let (n, (_, m)) = x?;
29 if let Some(m2) = txn
30 .rev_iter_tags(txn.tags(&*channel.read()), Some(n.into()))?
31 .next()
32 {
33 let (_, m2) = m2?;
34 Ok(Some((n, m.into(), m2.b.into())))
35 } else {
36 Ok(Some((n, m.into(), Merkle::zero())))
37 }
38 } else {
39 Ok(None)
40 }
41}
42
43impl Local {
44 pub fn get_state(
45 &mut self,
46 mid: Option<u64>,
47 ) -> Result<Option<(u64, Merkle, Merkle)>, anyhow::Error> {
48 let txn = self.pristine.txn_begin()?;
49 let channel = txn.load_channel(&self.channel)?.unwrap();
50 Ok(get_state(&txn, &channel, mid)?)
51 }
52
53 pub fn get_id(&self) -> Result<libpijul::pristine::RemoteId, anyhow::Error> {
54 let txn = self.pristine.txn_begin()?;
55 if let Some(channel) = txn.load_channel(&self.channel)? {
56 Ok(*txn.id(&*channel.read()).unwrap())
57 } else {
58 Err(anyhow::anyhow!(
59 "Channel {} does not exist in repository {}",
60 self.channel,
61 self.name
62 ))
63 }
64 }
65
66 pub fn download_changelist<
67 A,
68 F: FnMut(&mut A, u64, Hash, Merkle, bool) -> Result<(), anyhow::Error>,
69 >(
70 &mut self,
71 mut f: F,
72 a: &mut A,
73 from: u64,
74 paths: &[String],
75 ) -> Result<HashSet<Position<Hash>>, anyhow::Error> {
76 let store = libpijul::changestore::filesystem::FileSystem::from_root(
77 &self.root,
78 pijul_repository::max_files()?,
79 );
80 let remote_txn = self.pristine.txn_begin()?;
81 let remote_channel = if let Some(channel) = remote_txn.load_channel(&self.channel)? {
82 channel
83 } else {
84 debug!(
85 "Local::download_changelist found no channel named {:?}",
86 self.channel
87 );
88 bail!("No channel {} found for remote {}", self.name, self.channel)
89 };
90 let mut paths_ = HashSet::new();
91 let mut result = HashSet::new();
92 for s in paths {
93 if let Ok((p, _ambiguous)) = remote_txn.follow_oldest_path(&store, &remote_channel, s) {
94 debug!("p = {:?}", p);
95 result.insert(Position {
96 change: remote_txn.get_external(&p.change)?.unwrap().into(),
97 pos: p.pos,
98 });
99 paths_.insert(p);
100 paths_.extend(
101 libpijul::fs::iter_graph_descendants(&remote_txn, &remote_channel.read(), p)?
102 .map(|x| x.unwrap()),
103 );
104 }
105 }
106 debug!("paths_ = {:?}", paths_);
107 debug!("from = {:?}", from);
108
109 let rem = remote_channel.read();
110 let tags: Vec<u64> = remote_txn
111 .iter_tags(remote_txn.tags(&*rem), from)?
112 .map(|k| (*k.unwrap().0).into())
113 .collect();
114 let mut tagsi = 0;
115
116 for x in remote_txn.log(&*rem, from)? {
117 let (n, (h, m)) = x?;
118 assert!(n >= from);
119 let h_int = remote_txn.get_internal(h)?.unwrap();
120 if paths_.is_empty()
121 || paths_.iter().any(|x| {
122 remote_txn
123 .get_touched_files(x, Some(h_int))
124 .unwrap()
125 .is_some()
126 })
127 {
128 debug!("put_remote {:?} {:?} {:?}", n, h, m);
129 if tags.get(tagsi) == Some(&n) {
130 f(a, n, h.into(), m.into(), true)?;
131 tagsi += 1;
132 } else {
133 f(a, n, h.into(), m.into(), false)?;
134 }
135 }
136 }
137 Ok(result)
138 }
139
140 pub fn upload_changes(
141 &mut self,
142 progress_bar: ProgressBar,
143 mut local: PathBuf,
144 to_channel: Option<&str>,
145 changes: &[CS],
146 ) -> Result<(), anyhow::Error> {
147 let store = libpijul::changestore::filesystem::FileSystem::from_root(
148 &self.root,
149 pijul_repository::max_files()?,
150 );
151 let txn = self.pristine.arc_txn_begin()?;
152 let channel = txn
153 .write()
154 .open_or_create_channel(to_channel.unwrap_or(&self.channel))?;
155 for c in changes {
156 match c {
157 CS::Change(c) => {
158 libpijul::changestore::filesystem::push_filename(&mut local, &c);
159 libpijul::changestore::filesystem::push_filename(&mut self.changes_dir, &c);
160 }
161 CS::State(c) => {
162 libpijul::changestore::filesystem::push_tag_filename(&mut local, &c);
163 libpijul::changestore::filesystem::push_tag_filename(&mut self.changes_dir, &c);
164 }
165 }
166 std::fs::create_dir_all(&self.changes_dir.parent().unwrap())?;
167 debug!("hard link {:?} {:?}", local, self.changes_dir);
168 if std::fs::metadata(&self.changes_dir).is_err() {
169 if std::fs::hard_link(&local, &self.changes_dir).is_err() {
170 std::fs::copy(&local, &self.changes_dir)?;
171 }
172 }
173 debug!("hard link done");
174 libpijul::changestore::filesystem::pop_filename(&mut local);
175 libpijul::changestore::filesystem::pop_filename(&mut self.changes_dir);
176 }
177 let repo = libpijul::working_copy::filesystem::FileSystem::from_root(&self.root);
178 upload_changes(progress_bar, &store, &mut *txn.write(), &channel, changes)?;
179 libpijul::output::output_repository_no_pending(
180 &repo,
181 &store,
182 &txn,
183 &channel,
184 "",
185 true,
186 None,
187 std::thread::available_parallelism()?.get(),
188 0,
189 )?;
190 txn.commit()?;
191 Ok(())
192 }
193
194 pub async fn download_changes(
195 &mut self,
196 progress_bar: ProgressBar,
197 hashes: &mut tokio::sync::mpsc::UnboundedReceiver<CS>,
198 send: &mut tokio::sync::mpsc::Sender<(CS, bool)>,
199 mut path: &mut PathBuf,
200 ) -> Result<(), anyhow::Error> {
201 while let Some(c) = hashes.recv().await {
202 match c {
203 CS::Change(c) => {
204 libpijul::changestore::filesystem::push_filename(&mut self.changes_dir, &c);
205 libpijul::changestore::filesystem::push_filename(&mut path, &c);
206 }
207 CS::State(c) => {
208 libpijul::changestore::filesystem::push_tag_filename(&mut self.changes_dir, &c);
209 libpijul::changestore::filesystem::push_tag_filename(&mut path, &c);
210 }
211 }
212 progress_bar.inc(1);
213
214 if std::fs::metadata(&path).is_ok() {
215 debug!("metadata {:?} ok", path);
216 libpijul::changestore::filesystem::pop_filename(&mut path);
217 send.send((c, false)).await?;
218 continue;
219 }
220 std::fs::create_dir_all(&path.parent().unwrap())?;
221 if std::fs::hard_link(&self.changes_dir, &path).is_err() {
222 std::fs::copy(&self.changes_dir, &path)?;
223 }
224 libpijul::changestore::filesystem::pop_filename(&mut self.changes_dir);
225 libpijul::changestore::filesystem::pop_filename(&mut path);
226 send.send((c, true)).await?;
227 }
228 Ok(())
229 }
230
231 pub async fn update_identities(
232 &mut self,
233 _rev: Option<u64>,
234 mut path: PathBuf,
235 ) -> Result<u64, anyhow::Error> {
236 let mut other_path = self.root.join(DOT_DIR);
237 other_path.push("identities");
238 let r = if let Ok(r) = std::fs::read_dir(&other_path) {
239 r
240 } else {
241 return Ok(0);
242 };
243 std::fs::create_dir_all(&path)?;
244 for id in r {
245 let id = id?;
246 let m = id.metadata()?;
247 let p = id.path();
248 path.push(p.file_name().unwrap());
249 if let Ok(ml) = std::fs::metadata(&path) {
250 if ml.modified()? < m.modified()? {
251 std::fs::remove_file(&path)?;
252 } else {
253 path.pop();
254 continue;
255 }
256 }
257 if std::fs::hard_link(&p, &path).is_err() {
258 std::fs::copy(&p, &path)?;
259 }
260 debug!("hard link done");
261 path.pop();
262 }
263 Ok(0)
264 }
265}
266
267pub fn upload_changes<T: MutTxnTExt + 'static, C: libpijul::changestore::ChangeStore>(
268 progress_bar: ProgressBar,
269 store: &C,
270 txn: &mut T,
271 channel: &libpijul::pristine::ChannelRef<T>,
272 changes: &[CS],
273) -> Result<(), anyhow::Error> {
274 let mut ws = libpijul::ApplyWorkspace::new();
275 let mut channel = channel.write();
276 for c in changes {
277 match c {
278 CS::Change(c) => {
279 txn.apply_change_ws(store, &mut *channel, c, &mut ws)?;
280 }
281 CS::State(c) => {
282 if let Some(n) = txn.channel_has_state(txn.states(&*channel), &c.into())? {
283 let tags = txn.tags_mut(&mut *channel);
284 txn.put_tags(tags, n.into(), c)?;
285 } else {
286 bail!(
287 "Cannot add tag {}: channel {:?} does not have that state",
288 c.to_base32(),
289 txn.name(&*channel)
290 )
291 }
292 }
293 }
294 progress_bar.inc(1);
295 }
296 Ok(())
297}