1use crate::{
2 error::{Error, Result},
3 sync::{
4 cred::BasicAuthCredential,
5 remotes::push::push_raw,
6 remotes::push::{ProgressNotification, PushType},
7 RepoPath,
8 },
9 AsyncGitNotification, RemoteProgress,
10};
11use crossbeam_channel::{unbounded, Sender};
12use std::{
13 sync::{Arc, Mutex},
14 thread,
15};
16
17#[derive(Default, Clone, Debug)]
19pub struct PushRequest {
20 pub remote: String,
22 pub branch: String,
24 pub push_type: PushType,
26 pub force: bool,
28 pub delete: bool,
30 pub basic_credential: Option<BasicAuthCredential>,
32}
33
34#[derive(Default, Clone, Debug)]
36struct PushState {}
37
38pub struct AsyncPush {
40 state: Arc<Mutex<Option<PushState>>>,
41 last_result: Arc<Mutex<Option<String>>>,
42 progress: Arc<Mutex<Option<ProgressNotification>>>,
43 sender: Sender<AsyncGitNotification>,
44 repo: RepoPath,
45}
46
47impl AsyncPush {
48 pub fn new(
50 repo: RepoPath,
51 sender: &Sender<AsyncGitNotification>,
52 ) -> Self {
53 Self {
54 repo,
55 state: Arc::new(Mutex::new(None)),
56 last_result: Arc::new(Mutex::new(None)),
57 progress: Arc::new(Mutex::new(None)),
58 sender: sender.clone(),
59 }
60 }
61
62 pub fn is_pending(&self) -> Result<bool> {
64 let state = self.state.lock()?;
65 Ok(state.is_some())
66 }
67
68 pub fn last_result(&self) -> Result<Option<String>> {
70 let res = self.last_result.lock()?;
71 Ok(res.clone())
72 }
73
74 pub fn progress(&self) -> Result<Option<RemoteProgress>> {
76 let res = self.progress.lock()?;
77 Ok(res.as_ref().map(|progress| progress.clone().into()))
78 }
79
80 pub fn request(&self, params: PushRequest) -> Result<()> {
82 log::trace!("request");
83
84 if self.is_pending()? {
85 return Ok(());
86 }
87
88 self.set_request(¶ms)?;
89 RemoteProgress::set_progress(&self.progress, None)?;
90
91 let arc_state = Arc::clone(&self.state);
92 let arc_res = Arc::clone(&self.last_result);
93 let arc_progress = Arc::clone(&self.progress);
94 let sender = self.sender.clone();
95 let repo = self.repo.clone();
96
97 thread::spawn(move || {
98 let (progress_sender, receiver) = unbounded();
99
100 let handle = RemoteProgress::spawn_receiver_thread(
101 AsyncGitNotification::Push,
102 sender.clone(),
103 receiver,
104 arc_progress,
105 );
106
107 let res = push_raw(
108 &repo,
109 params.remote.as_str(),
110 params.branch.as_str(),
111 params.push_type,
112 params.force,
113 params.delete,
114 params.basic_credential.clone(),
115 Some(progress_sender.clone()),
116 );
117
118 progress_sender
119 .send(ProgressNotification::Done)
120 .expect("closing send failed");
121
122 handle.join().expect("joining thread failed");
123
124 Self::set_result(&arc_res, res).expect("result error");
125
126 Self::clear_request(&arc_state).expect("clear error");
127
128 sender
129 .send(AsyncGitNotification::Push)
130 .expect("error sending push");
131 });
132
133 Ok(())
134 }
135
136 fn set_request(&self, _params: &PushRequest) -> Result<()> {
137 let mut state = self.state.lock()?;
138
139 if state.is_some() {
140 return Err(Error::Generic("pending request".into()));
141 }
142
143 *state = Some(PushState {});
144
145 Ok(())
146 }
147
148 fn clear_request(
149 state: &Arc<Mutex<Option<PushState>>>,
150 ) -> Result<()> {
151 let mut state = state.lock()?;
152
153 *state = None;
154
155 Ok(())
156 }
157
158 fn set_result(
159 arc_result: &Arc<Mutex<Option<String>>>,
160 res: Result<()>,
161 ) -> Result<()> {
162 let mut last_res = arc_result.lock()?;
163
164 *last_res = match res {
165 Ok(()) => None,
166 Err(e) => {
167 log::error!("push error: {e}");
168 Some(e.to_string())
169 }
170 };
171
172 Ok(())
173 }
174}