gnostr_asyncgit/
pull.rs

1use std::{
2	sync::{Arc, Mutex},
3	thread,
4};
5
6use crossbeam_channel::{Sender, unbounded};
7
8use crate::{
9	AsyncGitNotification, RemoteProgress,
10	error::{Error, Result},
11	sync::{
12		RepoPath,
13		cred::BasicAuthCredential,
14		remotes::{fetch, push::ProgressNotification},
15	},
16};
17
18///
19#[derive(Default, Clone, Debug)]
20pub struct FetchRequest {
21	///
22	pub remote: String,
23	///
24	pub branch: String,
25	///
26	pub basic_credential: Option<BasicAuthCredential>,
27}
28
29//TODO: since this is empty we can go with a simple AtomicBool to
30// mark that we are fetching or not
31#[derive(Default, Clone, Debug)]
32struct FetchState {}
33
34///
35pub struct AsyncPull {
36	state: Arc<Mutex<Option<FetchState>>>,
37	last_result: Arc<Mutex<Option<(usize, String)>>>,
38	progress: Arc<Mutex<Option<ProgressNotification>>>,
39	sender: Sender<AsyncGitNotification>,
40	repo: RepoPath,
41}
42
43impl AsyncPull {
44	///
45	pub fn new(
46		repo: RepoPath,
47		sender: &Sender<AsyncGitNotification>,
48	) -> Self {
49		Self {
50			repo,
51			state: Arc::new(Mutex::new(None)),
52			last_result: Arc::new(Mutex::new(None)),
53			progress: Arc::new(Mutex::new(None)),
54			sender: sender.clone(),
55		}
56	}
57
58	///
59	pub fn is_pending(&self) -> Result<bool> {
60		let state = self.state.lock()?;
61		Ok(state.is_some())
62	}
63
64	///
65	pub fn last_result(&self) -> Result<Option<(usize, String)>> {
66		let res = self.last_result.lock()?;
67		Ok(res.clone())
68	}
69
70	///
71	pub fn progress(&self) -> Result<Option<RemoteProgress>> {
72		let res = self.progress.lock()?;
73		Ok(res.as_ref().map(|progress| progress.clone().into()))
74	}
75
76	///
77	pub fn request(&mut self, params: FetchRequest) -> Result<()> {
78		log::trace!("request");
79
80		if self.is_pending()? {
81			return Ok(());
82		}
83
84		self.set_request(&params)?;
85		RemoteProgress::set_progress(&self.progress, None)?;
86
87		let arc_state = Arc::clone(&self.state);
88		let arc_res = Arc::clone(&self.last_result);
89		let arc_progress = Arc::clone(&self.progress);
90		let sender = self.sender.clone();
91		let repo = self.repo.clone();
92
93		thread::spawn(move || {
94			let (progress_sender, receiver) = unbounded();
95
96			let handle = RemoteProgress::spawn_receiver_thread(
97				AsyncGitNotification::Pull,
98				sender.clone(),
99				receiver,
100				arc_progress,
101			);
102
103			let res = fetch(
104				&repo,
105				&params.branch,
106				params.basic_credential,
107				Some(progress_sender.clone()),
108			);
109
110			progress_sender
111				.send(ProgressNotification::Done)
112				.expect("closing send failed");
113
114			handle.join().expect("joining thread failed");
115
116			Self::set_result(&arc_res, res).expect("result error");
117
118			Self::clear_request(&arc_state).expect("clear error");
119
120			sender
121				.send(AsyncGitNotification::Pull)
122				.expect("AsyncNotification error");
123		});
124
125		Ok(())
126	}
127
128	fn set_request(&self, _params: &FetchRequest) -> Result<()> {
129		let mut state = self.state.lock()?;
130
131		if state.is_some() {
132			return Err(Error::Generic("pending request".into()));
133		}
134
135		*state = Some(FetchState {});
136
137		Ok(())
138	}
139
140	fn clear_request(
141		state: &Arc<Mutex<Option<FetchState>>>,
142	) -> Result<()> {
143		let mut state = state.lock()?;
144
145		*state = None;
146
147		Ok(())
148	}
149
150	fn set_result(
151		arc_result: &Arc<Mutex<Option<(usize, String)>>>,
152		res: Result<usize>,
153	) -> Result<()> {
154		let mut last_res = arc_result.lock()?;
155
156		*last_res = match res {
157			Ok(bytes) => Some((bytes, String::new())),
158			Err(e) => {
159				log::error!("fetch error: {}", e);
160				Some((0, e.to_string()))
161			}
162		};
163
164		Ok(())
165	}
166}