1use std::{
2 sync::{Arc, Mutex},
3 thread,
4};
5
6use crossbeam_channel::{unbounded, Sender};
7
8use crate::{
9 error::{Error, Result},
10 sync::{
11 cred::BasicAuthCredential,
12 remotes::{fetch, push::ProgressNotification},
13 RepoPath,
14 },
15 AsyncGitNotification, RemoteProgress,
16};
17
18#[derive(Default, Clone, Debug)]
20pub struct FetchRequest {
21 pub remote: String,
23 pub branch: String,
25 pub basic_credential: Option<BasicAuthCredential>,
27}
28
29#[derive(Default, Clone, Debug)]
32struct FetchState {}
33
34pub struct AsyncPull {
36 state: Arc<Mutex<Option<FetchState>>>,
37 last_result: Arc<Mutex<Option<(usize, String)>>>,
38 progress: Arc<Mutex<Option<ProgressNotification>>>,
39 sender: Sender<AsyncGitNotification>,
40 repo: RepoPath,
41}
42
43impl AsyncPull {
44 pub fn new(repo: RepoPath, sender: &Sender<AsyncGitNotification>) -> Self {
46 Self {
47 repo,
48 state: Arc::new(Mutex::new(None)),
49 last_result: Arc::new(Mutex::new(None)),
50 progress: Arc::new(Mutex::new(None)),
51 sender: sender.clone(),
52 }
53 }
54
55 pub fn is_pending(&self) -> Result<bool> {
57 let state = self.state.lock()?;
58 Ok(state.is_some())
59 }
60
61 pub fn last_result(&self) -> Result<Option<(usize, String)>> {
63 let res = self.last_result.lock()?;
64 Ok(res.clone())
65 }
66
67 pub fn progress(&self) -> Result<Option<RemoteProgress>> {
69 let res = self.progress.lock()?;
70 Ok(res.as_ref().map(|progress| progress.clone().into()))
71 }
72
73 pub fn request(&mut self, params: FetchRequest) -> Result<()> {
75 log::trace!("request");
76
77 if self.is_pending()? {
78 return Ok(());
79 }
80
81 self.set_request(¶ms)?;
82 RemoteProgress::set_progress(&self.progress, None)?;
83
84 let arc_state = Arc::clone(&self.state);
85 let arc_res = Arc::clone(&self.last_result);
86 let arc_progress = Arc::clone(&self.progress);
87 let sender = self.sender.clone();
88 let repo = self.repo.clone();
89
90 thread::spawn(move || {
91 let (progress_sender, receiver) = unbounded();
92
93 let handle = RemoteProgress::spawn_receiver_thread(
94 AsyncGitNotification::Pull,
95 sender.clone(),
96 receiver,
97 arc_progress,
98 );
99
100 let res = fetch(
101 &repo,
102 ¶ms.branch,
103 params.basic_credential,
104 Some(progress_sender.clone()),
105 );
106
107 progress_sender
108 .send(ProgressNotification::Done)
109 .expect("closing send failed");
110
111 handle.join().expect("joining thread failed");
112
113 Self::set_result(&arc_res, res).expect("result error");
114
115 Self::clear_request(&arc_state).expect("clear error");
116
117 sender
118 .send(AsyncGitNotification::Pull)
119 .expect("AsyncNotification error");
120 });
121
122 Ok(())
123 }
124
125 fn set_request(&self, _params: &FetchRequest) -> Result<()> {
126 let mut state = self.state.lock()?;
127
128 if state.is_some() {
129 return Err(Error::Generic("pending request".into()));
130 }
131
132 *state = Some(FetchState {});
133
134 Ok(())
135 }
136
137 fn clear_request(state: &Arc<Mutex<Option<FetchState>>>) -> Result<()> {
138 let mut state = state.lock()?;
139
140 *state = None;
141
142 Ok(())
143 }
144
145 fn set_result(
146 arc_result: &Arc<Mutex<Option<(usize, String)>>>,
147 res: Result<usize>,
148 ) -> Result<()> {
149 let mut last_res = arc_result.lock()?;
150
151 *last_res = match res {
152 Ok(bytes) => Some((bytes, String::new())),
153 Err(e) => {
154 log::error!("fetch error: {}", e);
155 Some((0, e.to_string()))
156 }
157 };
158
159 Ok(())
160 }
161}