gnostr_asyncgit/
revlog.rs1use std::{
2 sync::{
3 Arc, Mutex,
4 atomic::{AtomicBool, Ordering},
5 },
6 thread,
7 time::{Duration, Instant},
8};
9
10use crossbeam_channel::Sender;
11use scopetime::scope_time;
12
13use crate::{
14 AsyncGitNotification, Error,
15 error::Result,
16 sync::{
17 CommitId, LogWalker, RepoPath, SharedCommitFilterFn, repo,
18 },
19};
20
21#[derive(PartialEq, Eq, Debug)]
23pub enum FetchStatus {
24 Pending,
26 NoChange,
28 Started,
30}
31
32pub struct AsyncLogResult {
34 pub commits: Vec<CommitId>,
36 pub duration: Duration,
38}
39pub struct AsyncLog {
41 current: Arc<Mutex<AsyncLogResult>>,
42 current_head: Arc<Mutex<Option<CommitId>>>,
43 sender: Sender<AsyncGitNotification>,
44 pending: Arc<AtomicBool>,
45 background: Arc<AtomicBool>,
46 filter: Option<SharedCommitFilterFn>,
47 partial_extract: AtomicBool,
48 repo: RepoPath,
49}
50
51static LIMIT_COUNT: usize = 3000;
52static SLEEP_FOREGROUND: Duration = Duration::from_millis(2);
53static SLEEP_BACKGROUND: Duration = Duration::from_millis(1000);
54
55impl AsyncLog {
56 pub fn new(
58 repo: RepoPath,
59 sender: &Sender<AsyncGitNotification>,
60 filter: Option<SharedCommitFilterFn>,
61 ) -> Self {
62 Self {
63 repo,
64 current: Arc::new(Mutex::new(AsyncLogResult {
65 commits: Vec::new(),
66 duration: Duration::default(),
67 })),
68 current_head: Arc::new(Mutex::new(None)),
69 sender: sender.clone(),
70 pending: Arc::new(AtomicBool::new(false)),
71 background: Arc::new(AtomicBool::new(false)),
72 filter,
73 partial_extract: AtomicBool::new(false),
74 }
75 }
76
77 pub fn count(&self) -> Result<usize> {
79 Ok(self.current.lock()?.commits.len())
80 }
81
82 pub fn get_slice(
84 &self,
85 start_index: usize,
86 amount: usize,
87 ) -> Result<Vec<CommitId>> {
88 if self.partial_extract.load(Ordering::Relaxed) {
89 return Err(Error::Generic(String::from(
90 "Faulty usage of AsyncLog: Cannot partially extract items and rely on get_items slice to still work!",
91 )));
92 }
93
94 let list = &self.current.lock()?.commits;
95 let list_len = list.len();
96 let min = start_index.min(list_len);
97 let max = min + amount;
98 let max = max.min(list_len);
99 Ok(list[min..max].to_vec())
100 }
101
102 pub fn get_items(&self) -> Result<Vec<CommitId>> {
104 if self.partial_extract.load(Ordering::Relaxed) {
105 return Err(Error::Generic(String::from(
106 "Faulty usage of AsyncLog: Cannot partially extract items and rely on get_items slice to still work!",
107 )));
108 }
109
110 let list = &self.current.lock()?.commits;
111 Ok(list.clone())
112 }
113
114 pub fn extract_items(&self) -> Result<Vec<CommitId>> {
116 self.partial_extract.store(true, Ordering::Relaxed);
117 let list = &mut self.current.lock()?.commits;
118 let result = list.clone();
119 list.clear();
120 Ok(result)
121 }
122
123 pub fn get_last_duration(&self) -> Result<Duration> {
125 Ok(self.current.lock()?.duration)
126 }
127
128 pub fn is_pending(&self) -> bool {
130 self.pending.load(Ordering::Relaxed)
131 }
132
133 pub fn set_background(&mut self) {
135 self.background.store(true, Ordering::Relaxed);
136 }
137
138 fn current_head(&self) -> Result<Option<CommitId>> {
140 Ok(*self.current_head.lock()?)
141 }
142
143 fn head_changed(&self) -> Result<bool> {
145 if let Ok(head) = repo(&self.repo)?.head() {
146 return Ok(
147 head.target() != self.current_head()?.map(Into::into)
148 );
149 }
150 Ok(false)
151 }
152
153 pub fn fetch(&mut self) -> Result<FetchStatus> {
155 self.background.store(false, Ordering::Relaxed);
156
157 if self.is_pending() {
158 return Ok(FetchStatus::Pending);
159 }
160
161 if !self.head_changed()? {
162 return Ok(FetchStatus::NoChange);
163 }
164
165 self.pending.store(true, Ordering::Relaxed);
166
167 self.clear()?;
168
169 let arc_current = Arc::clone(&self.current);
170 let sender = self.sender.clone();
171 let arc_pending = Arc::clone(&self.pending);
172 let arc_background = Arc::clone(&self.background);
173 let filter = self.filter.clone();
174 let repo_path = self.repo.clone();
175
176 if let Ok(head) = repo(&self.repo)?.head() {
177 *self.current_head.lock()? =
178 head.target().map(CommitId::new);
179 }
180
181 rayon_core::spawn(move || {
182 scope_time!("async::revlog");
183
184 Self::fetch_helper(
185 &repo_path,
186 &arc_current,
187 &arc_background,
188 &sender,
189 filter,
190 )
191 .expect("failed to fetch");
192
193 arc_pending.store(false, Ordering::Relaxed);
194
195 Self::notify(&sender);
196 });
197
198 Ok(FetchStatus::Started)
199 }
200
201 fn fetch_helper(
202 repo_path: &RepoPath,
203 arc_current: &Arc<Mutex<AsyncLogResult>>,
204 arc_background: &Arc<AtomicBool>,
205 sender: &Sender<AsyncGitNotification>,
206 filter: Option<SharedCommitFilterFn>,
207 ) -> Result<()> {
208 let start_time = Instant::now();
209
210 let mut entries = vec![CommitId::default(); LIMIT_COUNT];
211 entries.resize(0, CommitId::default());
212
213 let r = repo(repo_path)?;
214 let mut walker =
215 LogWalker::new(&r, LIMIT_COUNT)?.filter(filter);
216
217 loop {
218 entries.clear();
219 let read = walker.read(&mut entries)?;
220
221 let mut current = arc_current.lock()?;
222 current.commits.extend(entries.iter());
223 current.duration = start_time.elapsed();
224
225 if read == 0 {
226 break;
227 }
228 Self::notify(sender);
229
230 let sleep_duration =
231 if arc_background.load(Ordering::Relaxed) {
232 SLEEP_BACKGROUND
233 } else {
234 SLEEP_FOREGROUND
235 };
236
237 thread::sleep(sleep_duration);
238 }
239
240 log::trace!("revlog visited: {}", walker.visited());
241
242 Ok(())
243 }
244
245 fn clear(&mut self) -> Result<()> {
246 self.current.lock()?.commits.clear();
247 *self.current_head.lock()? = None;
248 self.partial_extract.store(false, Ordering::Relaxed);
249 Ok(())
250 }
251
252 fn notify(sender: &Sender<AsyncGitNotification>) {
253 sender
254 .send(AsyncGitNotification::Log)
255 .expect("error sending");
256 }
257}