gnostr_asyncgit/
pull.rs

1use std::{
2    sync::{Arc, Mutex},
3    thread,
4};
5
6use crossbeam_channel::{unbounded, Sender};
7
8use crate::{
9    error::{Error, Result},
10    sync::{
11        cred::BasicAuthCredential,
12        remotes::{fetch, push::ProgressNotification},
13        RepoPath,
14    },
15    AsyncGitNotification, RemoteProgress,
16};
17
18///
19#[derive(Default, Clone, Debug)]
20pub struct FetchRequest {
21    ///
22    pub remote: String,
23    ///
24    pub branch: String,
25    ///
26    pub basic_credential: Option<BasicAuthCredential>,
27}
28
29//TODO: since this is empty we can go with a simple AtomicBool to
30// mark that we are fetching or not
31#[derive(Default, Clone, Debug)]
32struct FetchState {}
33
34///
35pub struct AsyncPull {
36    state: Arc<Mutex<Option<FetchState>>>,
37    last_result: Arc<Mutex<Option<(usize, String)>>>,
38    progress: Arc<Mutex<Option<ProgressNotification>>>,
39    sender: Sender<AsyncGitNotification>,
40    repo: RepoPath,
41}
42
43impl AsyncPull {
44    ///
45    pub fn new(repo: RepoPath, sender: &Sender<AsyncGitNotification>) -> Self {
46        Self {
47            repo,
48            state: Arc::new(Mutex::new(None)),
49            last_result: Arc::new(Mutex::new(None)),
50            progress: Arc::new(Mutex::new(None)),
51            sender: sender.clone(),
52        }
53    }
54
55    ///
56    pub fn is_pending(&self) -> Result<bool> {
57        let state = self.state.lock()?;
58        Ok(state.is_some())
59    }
60
61    ///
62    pub fn last_result(&self) -> Result<Option<(usize, String)>> {
63        let res = self.last_result.lock()?;
64        Ok(res.clone())
65    }
66
67    ///
68    pub fn progress(&self) -> Result<Option<RemoteProgress>> {
69        let res = self.progress.lock()?;
70        Ok(res.as_ref().map(|progress| progress.clone().into()))
71    }
72
73    ///
74    pub fn request(&mut self, params: FetchRequest) -> Result<()> {
75        log::trace!("request");
76
77        if self.is_pending()? {
78            return Ok(());
79        }
80
81        self.set_request(&params)?;
82        RemoteProgress::set_progress(&self.progress, None)?;
83
84        let arc_state = Arc::clone(&self.state);
85        let arc_res = Arc::clone(&self.last_result);
86        let arc_progress = Arc::clone(&self.progress);
87        let sender = self.sender.clone();
88        let repo = self.repo.clone();
89
90        thread::spawn(move || {
91            let (progress_sender, receiver) = unbounded();
92
93            let handle = RemoteProgress::spawn_receiver_thread(
94                AsyncGitNotification::Pull,
95                sender.clone(),
96                receiver,
97                arc_progress,
98            );
99
100            let res = fetch(
101                &repo,
102                &params.branch,
103                params.basic_credential,
104                Some(progress_sender.clone()),
105            );
106
107            progress_sender
108                .send(ProgressNotification::Done)
109                .expect("closing send failed");
110
111            handle.join().expect("joining thread failed");
112
113            Self::set_result(&arc_res, res).expect("result error");
114
115            Self::clear_request(&arc_state).expect("clear error");
116
117            sender
118                .send(AsyncGitNotification::Pull)
119                .expect("AsyncNotification error");
120        });
121
122        Ok(())
123    }
124
125    fn set_request(&self, _params: &FetchRequest) -> Result<()> {
126        let mut state = self.state.lock()?;
127
128        if state.is_some() {
129            return Err(Error::Generic("pending request".into()));
130        }
131
132        *state = Some(FetchState {});
133
134        Ok(())
135    }
136
137    fn clear_request(state: &Arc<Mutex<Option<FetchState>>>) -> Result<()> {
138        let mut state = state.lock()?;
139
140        *state = None;
141
142        Ok(())
143    }
144
145    fn set_result(
146        arc_result: &Arc<Mutex<Option<(usize, String)>>>,
147        res: Result<usize>,
148    ) -> Result<()> {
149        let mut last_res = arc_result.lock()?;
150
151        *last_res = match res {
152            Ok(bytes) => Some((bytes, String::new())),
153            Err(e) => {
154                log::error!("fetch error: {}", e);
155                Some((0, e.to_string()))
156            }
157        };
158
159        Ok(())
160    }
161}