gnostr_asyncgit/
revlog.rs1use std::{
2 sync::{
3 atomic::{AtomicBool, Ordering},
4 Arc, Mutex,
5 },
6 thread,
7 time::{Duration, Instant},
8};
9
10use crossbeam_channel::Sender;
11use scopetime::scope_time;
12
13use crate::{
14 error::Result,
15 sync::{repo, CommitId, LogWalker, RepoPath, SharedCommitFilterFn},
16 AsyncGitNotification, Error,
17};
18
19#[derive(PartialEq, Eq, Debug)]
21pub enum FetchStatus {
22 Pending,
24 NoChange,
26 Started,
28}
29
30pub struct AsyncLogResult {
32 pub commits: Vec<CommitId>,
34 pub duration: Duration,
36}
37pub struct AsyncLog {
39 current: Arc<Mutex<AsyncLogResult>>,
40 current_head: Arc<Mutex<Option<CommitId>>>,
41 sender: Sender<AsyncGitNotification>,
42 pending: Arc<AtomicBool>,
43 background: Arc<AtomicBool>,
44 filter: Option<SharedCommitFilterFn>,
45 partial_extract: AtomicBool,
46 repo: RepoPath,
47}
48
49static LIMIT_COUNT: usize = 3000;
50static SLEEP_FOREGROUND: Duration = Duration::from_millis(2);
51static SLEEP_BACKGROUND: Duration = Duration::from_millis(1000);
52
53impl AsyncLog {
54 pub fn new(
56 repo: RepoPath,
57 sender: &Sender<AsyncGitNotification>,
58 filter: Option<SharedCommitFilterFn>,
59 ) -> Self {
60 Self {
61 repo,
62 current: Arc::new(Mutex::new(AsyncLogResult {
63 commits: Vec::new(),
64 duration: Duration::default(),
65 })),
66 current_head: Arc::new(Mutex::new(None)),
67 sender: sender.clone(),
68 pending: Arc::new(AtomicBool::new(false)),
69 background: Arc::new(AtomicBool::new(false)),
70 filter,
71 partial_extract: AtomicBool::new(false),
72 }
73 }
74
75 pub fn count(&self) -> Result<usize> {
77 Ok(self.current.lock()?.commits.len())
78 }
79
80 pub fn get_slice(&self, start_index: usize, amount: usize) -> Result<Vec<CommitId>> {
82 if self.partial_extract.load(Ordering::Relaxed) {
83 return Err(Error::Generic(String::from(
84 "Faulty usage of AsyncLog: Cannot partially extract items and rely on get_items slice to still work!",
85 )));
86 }
87
88 let list = &self.current.lock()?.commits;
89 let list_len = list.len();
90 let min = start_index.min(list_len);
91 let max = min + amount;
92 let max = max.min(list_len);
93 Ok(list[min..max].to_vec())
94 }
95
96 pub fn get_items(&self) -> Result<Vec<CommitId>> {
98 if self.partial_extract.load(Ordering::Relaxed) {
99 return Err(Error::Generic(String::from(
100 "Faulty usage of AsyncLog: Cannot partially extract items and rely on get_items slice to still work!",
101 )));
102 }
103
104 let list = &self.current.lock()?.commits;
105 Ok(list.clone())
106 }
107
108 pub fn extract_items(&self) -> Result<Vec<CommitId>> {
110 self.partial_extract.store(true, Ordering::Relaxed);
111 let list = &mut self.current.lock()?.commits;
112 let result = list.clone();
113 list.clear();
114 Ok(result)
115 }
116
117 pub fn get_last_duration(&self) -> Result<Duration> {
119 Ok(self.current.lock()?.duration)
120 }
121
122 pub fn is_pending(&self) -> bool {
124 self.pending.load(Ordering::Relaxed)
125 }
126
127 pub fn set_background(&mut self) {
129 self.background.store(true, Ordering::Relaxed);
130 }
131
132 fn current_head(&self) -> Result<Option<CommitId>> {
134 Ok(*self.current_head.lock()?)
135 }
136
137 fn head_changed(&self) -> Result<bool> {
139 if let Ok(head) = repo(&self.repo)?.head() {
140 return Ok(head.target() != self.current_head()?.map(Into::into));
141 }
142 Ok(false)
143 }
144
145 pub fn fetch(&mut self) -> Result<FetchStatus> {
147 self.background.store(false, Ordering::Relaxed);
148
149 if self.is_pending() {
150 return Ok(FetchStatus::Pending);
151 }
152
153 if !self.head_changed()? {
154 return Ok(FetchStatus::NoChange);
155 }
156
157 self.pending.store(true, Ordering::Relaxed);
158
159 self.clear()?;
160
161 let arc_current = Arc::clone(&self.current);
162 let sender = self.sender.clone();
163 let arc_pending = Arc::clone(&self.pending);
164 let arc_background = Arc::clone(&self.background);
165 let filter = self.filter.clone();
166 let repo_path = self.repo.clone();
167
168 if let Ok(head) = repo(&self.repo)?.head() {
169 *self.current_head.lock()? = head.target().map(CommitId::new);
170 }
171
172 rayon_core::spawn(move || {
173 scope_time!("async::revlog");
174
175 Self::fetch_helper(&repo_path, &arc_current, &arc_background, &sender, filter)
176 .expect("failed to fetch");
177
178 arc_pending.store(false, Ordering::Relaxed);
179
180 Self::notify(&sender);
181 });
182
183 Ok(FetchStatus::Started)
184 }
185
186 fn fetch_helper(
187 repo_path: &RepoPath,
188 arc_current: &Arc<Mutex<AsyncLogResult>>,
189 arc_background: &Arc<AtomicBool>,
190 sender: &Sender<AsyncGitNotification>,
191 filter: Option<SharedCommitFilterFn>,
192 ) -> Result<()> {
193 let start_time = Instant::now();
194
195 let mut entries = vec![CommitId::default(); LIMIT_COUNT];
196 entries.resize(0, CommitId::default());
197
198 let r = repo(repo_path)?;
199 let mut walker = LogWalker::new(&r, LIMIT_COUNT)?.filter(filter);
200
201 loop {
202 entries.clear();
203 let read = walker.read(&mut entries)?;
204
205 let mut current = arc_current.lock()?;
206 current.commits.extend(entries.iter());
207 current.duration = start_time.elapsed();
208
209 if read == 0 {
210 break;
211 }
212 Self::notify(sender);
213
214 let sleep_duration = if arc_background.load(Ordering::Relaxed) {
215 SLEEP_BACKGROUND
216 } else {
217 SLEEP_FOREGROUND
218 };
219
220 thread::sleep(sleep_duration);
221 }
222
223 log::trace!("revlog visited: {}", walker.visited());
224
225 Ok(())
226 }
227
228 fn clear(&mut self) -> Result<()> {
229 self.current.lock()?.commits.clear();
230 *self.current_head.lock()? = None;
231 self.partial_extract.store(false, Ordering::Relaxed);
232 Ok(())
233 }
234
235 fn notify(sender: &Sender<AsyncGitNotification>) {
236 sender
237 .send(AsyncGitNotification::Log)
238 .expect("error sending");
239 }
240}