gnostr_asyncgit/
filter_commits.rs

1use std::{
2	sync::{
3		Arc, Mutex,
4		atomic::{AtomicBool, AtomicUsize, Ordering},
5	},
6	time::{Duration, Instant},
7};
8
9use rayon::{
10	prelude::ParallelIterator,
11	slice::{ParallelSlice, ParallelSliceMut},
12};
13
14use crate::{
15	AsyncGitNotification, ProgressPercent,
16	asyncjob::{AsyncJob, RunParams},
17	error::Result,
18	sync::{self, CommitId, RepoPath, SharedCommitFilterFn},
19};
20
21///
22pub struct CommitFilterResult {
23	///
24	pub result: Vec<CommitId>,
25	///
26	pub duration: Duration,
27}
28
29enum JobState {
30	Request {
31		commits: Vec<CommitId>,
32		repo_path: RepoPath,
33	},
34	Response(Result<CommitFilterResult>),
35}
36
37///
38#[derive(Clone)]
39pub struct AsyncCommitFilterJob {
40	state: Arc<Mutex<Option<JobState>>>,
41	filter: SharedCommitFilterFn,
42	cancellation_flag: Arc<AtomicBool>,
43}
44
45///
46impl AsyncCommitFilterJob {
47	///
48	pub fn new(
49		repo_path: RepoPath,
50		commits: Vec<CommitId>,
51		filter: SharedCommitFilterFn,
52		cancellation_flag: Arc<AtomicBool>,
53	) -> Self {
54		Self {
55			state: Arc::new(Mutex::new(Some(JobState::Request {
56				repo_path,
57				commits,
58			}))),
59			filter,
60			cancellation_flag,
61		}
62	}
63
64	///
65	pub fn result(&self) -> Option<Result<CommitFilterResult>> {
66		if let Ok(mut state) = self.state.lock() {
67			if let Some(state) = state.take() {
68				return match state {
69					JobState::Request { .. } => None,
70					JobState::Response(result) => Some(result),
71				};
72			}
73		}
74
75		None
76	}
77
78	fn run_request(
79		&self,
80		repo_path: &RepoPath,
81		commits: Vec<CommitId>,
82		params: &RunParams<AsyncGitNotification, ProgressPercent>,
83	) -> JobState {
84		let result = self
85			.filter_commits(repo_path, commits, params)
86			.map(|(start, result)| CommitFilterResult {
87				result,
88				duration: start.elapsed(),
89			});
90
91		JobState::Response(result)
92	}
93
94	fn filter_commits(
95		&self,
96		repo_path: &RepoPath,
97		commits: Vec<CommitId>,
98		params: &RunParams<AsyncGitNotification, ProgressPercent>,
99	) -> Result<(Instant, Vec<CommitId>)> {
100		scopetime::scope_time!("filter_commits");
101
102		let total_amount = commits.len();
103		let start = Instant::now();
104
105		//note: for some reason >4 threads degrades search
106		// performance
107		let pool =
108			rayon::ThreadPoolBuilder::new().num_threads(4).build()?;
109
110		let idx = AtomicUsize::new(0);
111
112		let mut result = pool.install(|| {
113			commits
114				.into_iter()
115				.enumerate()
116				.collect::<Vec<(usize, CommitId)>>()
117				.par_chunks(1000)
118				.filter_map(|c| {
119					//TODO: error log repo open errors
120					sync::repo(repo_path).ok().map(|repo| {
121						c.iter()
122							.filter_map(|(e, c)| {
123								let idx = idx.fetch_add(
124								1,
125								std::sync::atomic::Ordering::Relaxed,
126							);
127
128								if self
129									.cancellation_flag
130									.load(Ordering::Relaxed)
131								{
132									return None;
133								}
134
135								Self::update_progress(
136									params,
137									ProgressPercent::new(
138										idx,
139										total_amount,
140									),
141								);
142
143								(*self.filter)(&repo, c)
144									.ok()
145									.and_then(|res| {
146										res.then_some((*e, *c))
147									})
148							})
149							.collect::<Vec<_>>()
150					})
151				})
152				.flatten()
153				.collect::<Vec<_>>()
154		});
155
156		result.par_sort_by(|a, b| a.0.cmp(&b.0));
157
158		let result = result.into_iter().map(|c| c.1).collect();
159
160		Ok((start, result))
161	}
162
163	fn update_progress(
164		params: &RunParams<AsyncGitNotification, ProgressPercent>,
165		new_progress: ProgressPercent,
166	) {
167		match params.set_progress(new_progress) {
168			Err(e) => log::error!("progress error: {e}"),
169			Ok(result) if result => {
170				if let Err(e) =
171					params.send(AsyncGitNotification::CommitFilter)
172				{
173					log::error!("send error: {e}");
174				}
175			}
176			_ => (),
177		}
178	}
179}
180
181impl AsyncJob for AsyncCommitFilterJob {
182	type Notification = AsyncGitNotification;
183	type Progress = ProgressPercent;
184
185	fn run(
186		&mut self,
187		params: RunParams<Self::Notification, Self::Progress>,
188	) -> Result<Self::Notification> {
189		if let Ok(mut state) = self.state.lock() {
190			*state = state.take().map(|state| match state {
191				JobState::Request { commits, repo_path } => {
192					self.run_request(&repo_path, commits, &params)
193				}
194				JobState::Response(result) => {
195					JobState::Response(result)
196				}
197			});
198		}
199
200		Ok(AsyncGitNotification::CommitFilter)
201	}
202}