gnostr_asyncgit/
revlog.rs

1use std::{
2    sync::{
3        atomic::{AtomicBool, Ordering},
4        Arc, Mutex,
5    },
6    thread,
7    time::{Duration, Instant},
8};
9
10use crossbeam_channel::Sender;
11use scopetime::scope_time;
12
13use crate::{
14    error::Result,
15    sync::{repo, CommitId, LogWalker, RepoPath, SharedCommitFilterFn},
16    AsyncGitNotification, Error,
17};
18
19///
20#[derive(PartialEq, Eq, Debug)]
21pub enum FetchStatus {
22    /// previous fetch still running
23    Pending,
24    /// no change expected
25    NoChange,
26    /// new walk was started
27    Started,
28}
29
30///
31pub struct AsyncLogResult {
32    ///
33    pub commits: Vec<CommitId>,
34    ///
35    pub duration: Duration,
36}
37///
38pub struct AsyncLog {
39    current: Arc<Mutex<AsyncLogResult>>,
40    current_head: Arc<Mutex<Option<CommitId>>>,
41    sender: Sender<AsyncGitNotification>,
42    pending: Arc<AtomicBool>,
43    background: Arc<AtomicBool>,
44    filter: Option<SharedCommitFilterFn>,
45    partial_extract: AtomicBool,
46    repo: RepoPath,
47}
48
49static LIMIT_COUNT: usize = 3000;
50static SLEEP_FOREGROUND: Duration = Duration::from_millis(2);
51static SLEEP_BACKGROUND: Duration = Duration::from_millis(1000);
52
53impl AsyncLog {
54    ///
55    pub fn new(
56        repo: RepoPath,
57        sender: &Sender<AsyncGitNotification>,
58        filter: Option<SharedCommitFilterFn>,
59    ) -> Self {
60        Self {
61            repo,
62            current: Arc::new(Mutex::new(AsyncLogResult {
63                commits: Vec::new(),
64                duration: Duration::default(),
65            })),
66            current_head: Arc::new(Mutex::new(None)),
67            sender: sender.clone(),
68            pending: Arc::new(AtomicBool::new(false)),
69            background: Arc::new(AtomicBool::new(false)),
70            filter,
71            partial_extract: AtomicBool::new(false),
72        }
73    }
74
75    ///
76    pub fn count(&self) -> Result<usize> {
77        Ok(self.current.lock()?.commits.len())
78    }
79
80    ///
81    pub fn get_slice(&self, start_index: usize, amount: usize) -> Result<Vec<CommitId>> {
82        if self.partial_extract.load(Ordering::Relaxed) {
83            return Err(Error::Generic(String::from(
84				"Faulty usage of AsyncLog: Cannot partially extract items and rely on get_items slice to still work!",
85			)));
86        }
87
88        let list = &self.current.lock()?.commits;
89        let list_len = list.len();
90        let min = start_index.min(list_len);
91        let max = min + amount;
92        let max = max.min(list_len);
93        Ok(list[min..max].to_vec())
94    }
95
96    ///
97    pub fn get_items(&self) -> Result<Vec<CommitId>> {
98        if self.partial_extract.load(Ordering::Relaxed) {
99            return Err(Error::Generic(String::from(
100				"Faulty usage of AsyncLog: Cannot partially extract items and rely on get_items slice to still work!",
101			)));
102        }
103
104        let list = &self.current.lock()?.commits;
105        Ok(list.clone())
106    }
107
108    ///
109    pub fn extract_items(&self) -> Result<Vec<CommitId>> {
110        self.partial_extract.store(true, Ordering::Relaxed);
111        let list = &mut self.current.lock()?.commits;
112        let result = list.clone();
113        list.clear();
114        Ok(result)
115    }
116
117    ///
118    pub fn get_last_duration(&self) -> Result<Duration> {
119        Ok(self.current.lock()?.duration)
120    }
121
122    ///
123    pub fn is_pending(&self) -> bool {
124        self.pending.load(Ordering::Relaxed)
125    }
126
127    ///
128    pub fn set_background(&mut self) {
129        self.background.store(true, Ordering::Relaxed);
130    }
131
132    ///
133    fn current_head(&self) -> Result<Option<CommitId>> {
134        Ok(*self.current_head.lock()?)
135    }
136
137    ///
138    fn head_changed(&self) -> Result<bool> {
139        if let Ok(head) = repo(&self.repo)?.head() {
140            return Ok(head.target() != self.current_head()?.map(Into::into));
141        }
142        Ok(false)
143    }
144
145    ///
146    pub fn fetch(&mut self) -> Result<FetchStatus> {
147        self.background.store(false, Ordering::Relaxed);
148
149        if self.is_pending() {
150            return Ok(FetchStatus::Pending);
151        }
152
153        if !self.head_changed()? {
154            return Ok(FetchStatus::NoChange);
155        }
156
157        self.pending.store(true, Ordering::Relaxed);
158
159        self.clear()?;
160
161        let arc_current = Arc::clone(&self.current);
162        let sender = self.sender.clone();
163        let arc_pending = Arc::clone(&self.pending);
164        let arc_background = Arc::clone(&self.background);
165        let filter = self.filter.clone();
166        let repo_path = self.repo.clone();
167
168        if let Ok(head) = repo(&self.repo)?.head() {
169            *self.current_head.lock()? = head.target().map(CommitId::new);
170        }
171
172        rayon_core::spawn(move || {
173            scope_time!("async::revlog");
174
175            Self::fetch_helper(&repo_path, &arc_current, &arc_background, &sender, filter)
176                .expect("failed to fetch");
177
178            arc_pending.store(false, Ordering::Relaxed);
179
180            Self::notify(&sender);
181        });
182
183        Ok(FetchStatus::Started)
184    }
185
186    fn fetch_helper(
187        repo_path: &RepoPath,
188        arc_current: &Arc<Mutex<AsyncLogResult>>,
189        arc_background: &Arc<AtomicBool>,
190        sender: &Sender<AsyncGitNotification>,
191        filter: Option<SharedCommitFilterFn>,
192    ) -> Result<()> {
193        let start_time = Instant::now();
194
195        let mut entries = vec![CommitId::default(); LIMIT_COUNT];
196        entries.resize(0, CommitId::default());
197
198        let r = repo(repo_path)?;
199        let mut walker = LogWalker::new(&r, LIMIT_COUNT)?.filter(filter);
200
201        loop {
202            entries.clear();
203            let read = walker.read(&mut entries)?;
204
205            let mut current = arc_current.lock()?;
206            current.commits.extend(entries.iter());
207            current.duration = start_time.elapsed();
208
209            if read == 0 {
210                break;
211            }
212            Self::notify(sender);
213
214            let sleep_duration = if arc_background.load(Ordering::Relaxed) {
215                SLEEP_BACKGROUND
216            } else {
217                SLEEP_FOREGROUND
218            };
219
220            thread::sleep(sleep_duration);
221        }
222
223        log::trace!("revlog visited: {}", walker.visited());
224
225        Ok(())
226    }
227
228    fn clear(&mut self) -> Result<()> {
229        self.current.lock()?.commits.clear();
230        *self.current_head.lock()? = None;
231        self.partial_extract.store(false, Ordering::Relaxed);
232        Ok(())
233    }
234
235    fn notify(sender: &Sender<AsyncGitNotification>) {
236        sender
237            .send(AsyncGitNotification::Log)
238            .expect("error sending");
239    }
240}