1use std::{
2 sync::{Arc, Mutex},
3 thread,
4};
5
6use crossbeam_channel::{Sender, unbounded};
7
8use crate::{
9 AsyncGitNotification, RemoteProgress,
10 error::{Error, Result},
11 sync::{
12 RepoPath,
13 cred::BasicAuthCredential,
14 remotes::{fetch, push::ProgressNotification},
15 },
16};
17
18#[derive(Default, Clone, Debug)]
20pub struct FetchRequest {
21 pub remote: String,
23 pub branch: String,
25 pub basic_credential: Option<BasicAuthCredential>,
27}
28
29#[derive(Default, Clone, Debug)]
32struct FetchState {}
33
34pub struct AsyncPull {
36 state: Arc<Mutex<Option<FetchState>>>,
37 last_result: Arc<Mutex<Option<(usize, String)>>>,
38 progress: Arc<Mutex<Option<ProgressNotification>>>,
39 sender: Sender<AsyncGitNotification>,
40 repo: RepoPath,
41}
42
43impl AsyncPull {
44 pub fn new(
46 repo: RepoPath,
47 sender: &Sender<AsyncGitNotification>,
48 ) -> Self {
49 Self {
50 repo,
51 state: Arc::new(Mutex::new(None)),
52 last_result: Arc::new(Mutex::new(None)),
53 progress: Arc::new(Mutex::new(None)),
54 sender: sender.clone(),
55 }
56 }
57
58 pub fn is_pending(&self) -> Result<bool> {
60 let state = self.state.lock()?;
61 Ok(state.is_some())
62 }
63
64 pub fn last_result(&self) -> Result<Option<(usize, String)>> {
66 let res = self.last_result.lock()?;
67 Ok(res.clone())
68 }
69
70 pub fn progress(&self) -> Result<Option<RemoteProgress>> {
72 let res = self.progress.lock()?;
73 Ok(res.as_ref().map(|progress| progress.clone().into()))
74 }
75
76 pub fn request(&mut self, params: FetchRequest) -> Result<()> {
78 log::trace!("request");
79
80 if self.is_pending()? {
81 return Ok(());
82 }
83
84 self.set_request(¶ms)?;
85 RemoteProgress::set_progress(&self.progress, None)?;
86
87 let arc_state = Arc::clone(&self.state);
88 let arc_res = Arc::clone(&self.last_result);
89 let arc_progress = Arc::clone(&self.progress);
90 let sender = self.sender.clone();
91 let repo = self.repo.clone();
92
93 thread::spawn(move || {
94 let (progress_sender, receiver) = unbounded();
95
96 let handle = RemoteProgress::spawn_receiver_thread(
97 AsyncGitNotification::Pull,
98 sender.clone(),
99 receiver,
100 arc_progress,
101 );
102
103 let res = fetch(
104 &repo,
105 ¶ms.branch,
106 params.basic_credential,
107 Some(progress_sender.clone()),
108 );
109
110 progress_sender
111 .send(ProgressNotification::Done)
112 .expect("closing send failed");
113
114 handle.join().expect("joining thread failed");
115
116 Self::set_result(&arc_res, res).expect("result error");
117
118 Self::clear_request(&arc_state).expect("clear error");
119
120 sender
121 .send(AsyncGitNotification::Pull)
122 .expect("AsyncNotification error");
123 });
124
125 Ok(())
126 }
127
128 fn set_request(&self, _params: &FetchRequest) -> Result<()> {
129 let mut state = self.state.lock()?;
130
131 if state.is_some() {
132 return Err(Error::Generic("pending request".into()));
133 }
134
135 *state = Some(FetchState {});
136
137 Ok(())
138 }
139
140 fn clear_request(
141 state: &Arc<Mutex<Option<FetchState>>>,
142 ) -> Result<()> {
143 let mut state = state.lock()?;
144
145 *state = None;
146
147 Ok(())
148 }
149
150 fn set_result(
151 arc_result: &Arc<Mutex<Option<(usize, String)>>>,
152 res: Result<usize>,
153 ) -> Result<()> {
154 let mut last_res = arc_result.lock()?;
155
156 *last_res = match res {
157 Ok(bytes) => Some((bytes, String::new())),
158 Err(e) => {
159 log::error!("fetch error: {}", e);
160 Some((0, e.to_string()))
161 }
162 };
163
164 Ok(())
165 }
166}