gnostr_asyncgit/
revlog.rs

1use std::{
2	sync::{
3		Arc, Mutex,
4		atomic::{AtomicBool, Ordering},
5	},
6	thread,
7	time::{Duration, Instant},
8};
9
10use crossbeam_channel::Sender;
11use scopetime::scope_time;
12
13use crate::{
14	AsyncGitNotification, Error,
15	error::Result,
16	sync::{
17		CommitId, LogWalker, RepoPath, SharedCommitFilterFn, repo,
18	},
19};
20
21///
22#[derive(PartialEq, Eq, Debug)]
23pub enum FetchStatus {
24	/// previous fetch still running
25	Pending,
26	/// no change expected
27	NoChange,
28	/// new walk was started
29	Started,
30}
31
32///
33pub struct AsyncLogResult {
34	///
35	pub commits: Vec<CommitId>,
36	///
37	pub duration: Duration,
38}
39///
40pub struct AsyncLog {
41	current: Arc<Mutex<AsyncLogResult>>,
42	current_head: Arc<Mutex<Option<CommitId>>>,
43	sender: Sender<AsyncGitNotification>,
44	pending: Arc<AtomicBool>,
45	background: Arc<AtomicBool>,
46	filter: Option<SharedCommitFilterFn>,
47	partial_extract: AtomicBool,
48	repo: RepoPath,
49}
50
51static LIMIT_COUNT: usize = 3000;
52static SLEEP_FOREGROUND: Duration = Duration::from_millis(2);
53static SLEEP_BACKGROUND: Duration = Duration::from_millis(1000);
54
55impl AsyncLog {
56	///
57	pub fn new(
58		repo: RepoPath,
59		sender: &Sender<AsyncGitNotification>,
60		filter: Option<SharedCommitFilterFn>,
61	) -> Self {
62		Self {
63			repo,
64			current: Arc::new(Mutex::new(AsyncLogResult {
65				commits: Vec::new(),
66				duration: Duration::default(),
67			})),
68			current_head: Arc::new(Mutex::new(None)),
69			sender: sender.clone(),
70			pending: Arc::new(AtomicBool::new(false)),
71			background: Arc::new(AtomicBool::new(false)),
72			filter,
73			partial_extract: AtomicBool::new(false),
74		}
75	}
76
77	///
78	pub fn count(&self) -> Result<usize> {
79		Ok(self.current.lock()?.commits.len())
80	}
81
82	///
83	pub fn get_slice(
84		&self,
85		start_index: usize,
86		amount: usize,
87	) -> Result<Vec<CommitId>> {
88		if self.partial_extract.load(Ordering::Relaxed) {
89			return Err(Error::Generic(String::from(
90				"Faulty usage of AsyncLog: Cannot partially extract items and rely on get_items slice to still work!",
91			)));
92		}
93
94		let list = &self.current.lock()?.commits;
95		let list_len = list.len();
96		let min = start_index.min(list_len);
97		let max = min + amount;
98		let max = max.min(list_len);
99		Ok(list[min..max].to_vec())
100	}
101
102	///
103	pub fn get_items(&self) -> Result<Vec<CommitId>> {
104		if self.partial_extract.load(Ordering::Relaxed) {
105			return Err(Error::Generic(String::from(
106				"Faulty usage of AsyncLog: Cannot partially extract items and rely on get_items slice to still work!",
107			)));
108		}
109
110		let list = &self.current.lock()?.commits;
111		Ok(list.clone())
112	}
113
114	///
115	pub fn extract_items(&self) -> Result<Vec<CommitId>> {
116		self.partial_extract.store(true, Ordering::Relaxed);
117		let list = &mut self.current.lock()?.commits;
118		let result = list.clone();
119		list.clear();
120		Ok(result)
121	}
122
123	///
124	pub fn get_last_duration(&self) -> Result<Duration> {
125		Ok(self.current.lock()?.duration)
126	}
127
128	///
129	pub fn is_pending(&self) -> bool {
130		self.pending.load(Ordering::Relaxed)
131	}
132
133	///
134	pub fn set_background(&mut self) {
135		self.background.store(true, Ordering::Relaxed);
136	}
137
138	///
139	fn current_head(&self) -> Result<Option<CommitId>> {
140		Ok(*self.current_head.lock()?)
141	}
142
143	///
144	fn head_changed(&self) -> Result<bool> {
145		if let Ok(head) = repo(&self.repo)?.head() {
146			return Ok(
147				head.target() != self.current_head()?.map(Into::into)
148			);
149		}
150		Ok(false)
151	}
152
153	///
154	pub fn fetch(&mut self) -> Result<FetchStatus> {
155		self.background.store(false, Ordering::Relaxed);
156
157		if self.is_pending() {
158			return Ok(FetchStatus::Pending);
159		}
160
161		if !self.head_changed()? {
162			return Ok(FetchStatus::NoChange);
163		}
164
165		self.pending.store(true, Ordering::Relaxed);
166
167		self.clear()?;
168
169		let arc_current = Arc::clone(&self.current);
170		let sender = self.sender.clone();
171		let arc_pending = Arc::clone(&self.pending);
172		let arc_background = Arc::clone(&self.background);
173		let filter = self.filter.clone();
174		let repo_path = self.repo.clone();
175
176		if let Ok(head) = repo(&self.repo)?.head() {
177			*self.current_head.lock()? =
178				head.target().map(CommitId::new);
179		}
180
181		rayon_core::spawn(move || {
182			scope_time!("async::revlog");
183
184			Self::fetch_helper(
185				&repo_path,
186				&arc_current,
187				&arc_background,
188				&sender,
189				filter,
190			)
191			.expect("failed to fetch");
192
193			arc_pending.store(false, Ordering::Relaxed);
194
195			Self::notify(&sender);
196		});
197
198		Ok(FetchStatus::Started)
199	}
200
201	fn fetch_helper(
202		repo_path: &RepoPath,
203		arc_current: &Arc<Mutex<AsyncLogResult>>,
204		arc_background: &Arc<AtomicBool>,
205		sender: &Sender<AsyncGitNotification>,
206		filter: Option<SharedCommitFilterFn>,
207	) -> Result<()> {
208		let start_time = Instant::now();
209
210		let mut entries = vec![CommitId::default(); LIMIT_COUNT];
211		entries.resize(0, CommitId::default());
212
213		let r = repo(repo_path)?;
214		let mut walker =
215			LogWalker::new(&r, LIMIT_COUNT)?.filter(filter);
216
217		loop {
218			entries.clear();
219			let read = walker.read(&mut entries)?;
220
221			let mut current = arc_current.lock()?;
222			current.commits.extend(entries.iter());
223			current.duration = start_time.elapsed();
224
225			if read == 0 {
226				break;
227			}
228			Self::notify(sender);
229
230			let sleep_duration =
231				if arc_background.load(Ordering::Relaxed) {
232					SLEEP_BACKGROUND
233				} else {
234					SLEEP_FOREGROUND
235				};
236
237			thread::sleep(sleep_duration);
238		}
239
240		log::trace!("revlog visited: {}", walker.visited());
241
242		Ok(())
243	}
244
245	fn clear(&mut self) -> Result<()> {
246		self.current.lock()?.commits.clear();
247		*self.current_head.lock()? = None;
248		self.partial_extract.store(false, Ordering::Relaxed);
249		Ok(())
250	}
251
252	fn notify(sender: &Sender<AsyncGitNotification>) {
253		sender
254			.send(AsyncGitNotification::Log)
255			.expect("error sending");
256	}
257}