1use std::{
2 sync::{Arc, Mutex},
3 thread,
4};
5
6use crossbeam_channel::{unbounded, Sender};
7
8use crate::{
9 error::{Error, Result},
10 sync::{
11 cred::BasicAuthCredential,
12 remotes::push::{push_raw, ProgressNotification, PushType},
13 RepoPath,
14 },
15 AsyncGitNotification, RemoteProgress,
16};
17
18#[derive(Default, Clone, Debug)]
20pub struct PushRequest {
21 pub remote: String,
23 pub branch: String,
25 pub push_type: PushType,
27 pub force: bool,
29 pub delete: bool,
31 pub basic_credential: Option<BasicAuthCredential>,
33}
34
35#[derive(Default, Clone, Debug)]
38struct PushState {}
39
40pub struct AsyncPush {
42 state: Arc<Mutex<Option<PushState>>>,
43 last_result: Arc<Mutex<Option<String>>>,
44 progress: Arc<Mutex<Option<ProgressNotification>>>,
45 sender: Sender<AsyncGitNotification>,
46 repo: RepoPath,
47}
48
49impl AsyncPush {
50 pub fn new(repo: RepoPath, sender: &Sender<AsyncGitNotification>) -> Self {
52 Self {
53 repo,
54 state: Arc::new(Mutex::new(None)),
55 last_result: Arc::new(Mutex::new(None)),
56 progress: Arc::new(Mutex::new(None)),
57 sender: sender.clone(),
58 }
59 }
60
61 pub fn is_pending(&self) -> Result<bool> {
63 let state = self.state.lock()?;
64 Ok(state.is_some())
65 }
66
67 pub fn last_result(&self) -> Result<Option<String>> {
69 let res = self.last_result.lock()?;
70 Ok(res.clone())
71 }
72
73 pub fn progress(&self) -> Result<Option<RemoteProgress>> {
75 let res = self.progress.lock()?;
76 Ok(res.as_ref().map(|progress| progress.clone().into()))
77 }
78
79 pub fn request(&mut self, params: PushRequest) -> Result<()> {
81 log::trace!("request");
82
83 if self.is_pending()? {
84 return Ok(());
85 }
86
87 self.set_request(¶ms)?;
88 RemoteProgress::set_progress(&self.progress, None)?;
89
90 let arc_state = Arc::clone(&self.state);
91 let arc_res = Arc::clone(&self.last_result);
92 let arc_progress = Arc::clone(&self.progress);
93 let sender = self.sender.clone();
94 let repo = self.repo.clone();
95
96 thread::spawn(move || {
97 let (progress_sender, receiver) = unbounded();
98
99 let handle = RemoteProgress::spawn_receiver_thread(
100 AsyncGitNotification::Push,
101 sender.clone(),
102 receiver,
103 arc_progress,
104 );
105
106 let res = push_raw(
107 &repo,
108 params.remote.as_str(),
109 params.branch.as_str(),
110 params.push_type,
111 params.force,
112 params.delete,
113 params.basic_credential.clone(),
114 Some(progress_sender.clone()),
115 );
116
117 progress_sender
118 .send(ProgressNotification::Done)
119 .expect("closing send failed");
120
121 handle.join().expect("joining thread failed");
122
123 Self::set_result(&arc_res, res).expect("result error");
124
125 Self::clear_request(&arc_state).expect("clear error");
126
127 sender
128 .send(AsyncGitNotification::Push)
129 .expect("error sending push");
130 });
131
132 Ok(())
133 }
134
135 fn set_request(&self, _params: &PushRequest) -> Result<()> {
136 let mut state = self.state.lock()?;
137
138 if state.is_some() {
139 return Err(Error::Generic("pending request".into()));
140 }
141
142 *state = Some(PushState {});
143
144 Ok(())
145 }
146
147 fn clear_request(state: &Arc<Mutex<Option<PushState>>>) -> Result<()> {
148 let mut state = state.lock()?;
149
150 *state = None;
151
152 Ok(())
153 }
154
155 fn set_result(arc_result: &Arc<Mutex<Option<String>>>, res: Result<()>) -> Result<()> {
156 let mut last_res = arc_result.lock()?;
157
158 *last_res = match res {
159 Ok(()) => None,
160 Err(e) => {
161 log::error!("push error: {}", e);
162 Some(e.to_string())
163 }
164 };
165
166 Ok(())
167 }
168}