gnostr_asyncgit/
filter_commits.rs1use std::{
2 sync::{
3 Arc, Mutex,
4 atomic::{AtomicBool, AtomicUsize, Ordering},
5 },
6 time::{Duration, Instant},
7};
8
9use rayon::{
10 prelude::ParallelIterator,
11 slice::{ParallelSlice, ParallelSliceMut},
12};
13
14use crate::{
15 AsyncGitNotification, ProgressPercent,
16 asyncjob::{AsyncJob, RunParams},
17 error::Result,
18 sync::{self, CommitId, RepoPath, SharedCommitFilterFn},
19};
20
21pub struct CommitFilterResult {
23 pub result: Vec<CommitId>,
25 pub duration: Duration,
27}
28
29enum JobState {
30 Request {
31 commits: Vec<CommitId>,
32 repo_path: RepoPath,
33 },
34 Response(Result<CommitFilterResult>),
35}
36
37#[derive(Clone)]
39pub struct AsyncCommitFilterJob {
40 state: Arc<Mutex<Option<JobState>>>,
41 filter: SharedCommitFilterFn,
42 cancellation_flag: Arc<AtomicBool>,
43}
44
45impl AsyncCommitFilterJob {
47 pub fn new(
49 repo_path: RepoPath,
50 commits: Vec<CommitId>,
51 filter: SharedCommitFilterFn,
52 cancellation_flag: Arc<AtomicBool>,
53 ) -> Self {
54 Self {
55 state: Arc::new(Mutex::new(Some(JobState::Request {
56 repo_path,
57 commits,
58 }))),
59 filter,
60 cancellation_flag,
61 }
62 }
63
64 pub fn result(&self) -> Option<Result<CommitFilterResult>> {
66 if let Ok(mut state) = self.state.lock() {
67 if let Some(state) = state.take() {
68 return match state {
69 JobState::Request { .. } => None,
70 JobState::Response(result) => Some(result),
71 };
72 }
73 }
74
75 None
76 }
77
78 fn run_request(
79 &self,
80 repo_path: &RepoPath,
81 commits: Vec<CommitId>,
82 params: &RunParams<AsyncGitNotification, ProgressPercent>,
83 ) -> JobState {
84 let result = self
85 .filter_commits(repo_path, commits, params)
86 .map(|(start, result)| CommitFilterResult {
87 result,
88 duration: start.elapsed(),
89 });
90
91 JobState::Response(result)
92 }
93
94 fn filter_commits(
95 &self,
96 repo_path: &RepoPath,
97 commits: Vec<CommitId>,
98 params: &RunParams<AsyncGitNotification, ProgressPercent>,
99 ) -> Result<(Instant, Vec<CommitId>)> {
100 scopetime::scope_time!("filter_commits");
101
102 let total_amount = commits.len();
103 let start = Instant::now();
104
105 let pool =
108 rayon::ThreadPoolBuilder::new().num_threads(4).build()?;
109
110 let idx = AtomicUsize::new(0);
111
112 let mut result = pool.install(|| {
113 commits
114 .into_iter()
115 .enumerate()
116 .collect::<Vec<(usize, CommitId)>>()
117 .par_chunks(1000)
118 .filter_map(|c| {
119 sync::repo(repo_path).ok().map(|repo| {
121 c.iter()
122 .filter_map(|(e, c)| {
123 let idx = idx.fetch_add(
124 1,
125 std::sync::atomic::Ordering::Relaxed,
126 );
127
128 if self
129 .cancellation_flag
130 .load(Ordering::Relaxed)
131 {
132 return None;
133 }
134
135 Self::update_progress(
136 params,
137 ProgressPercent::new(
138 idx,
139 total_amount,
140 ),
141 );
142
143 (*self.filter)(&repo, c)
144 .ok()
145 .and_then(|res| {
146 res.then_some((*e, *c))
147 })
148 })
149 .collect::<Vec<_>>()
150 })
151 })
152 .flatten()
153 .collect::<Vec<_>>()
154 });
155
156 result.par_sort_by(|a, b| a.0.cmp(&b.0));
157
158 let result = result.into_iter().map(|c| c.1).collect();
159
160 Ok((start, result))
161 }
162
163 fn update_progress(
164 params: &RunParams<AsyncGitNotification, ProgressPercent>,
165 new_progress: ProgressPercent,
166 ) {
167 match params.set_progress(new_progress) {
168 Err(e) => log::error!("progress error: {e}"),
169 Ok(result) if result => {
170 if let Err(e) =
171 params.send(AsyncGitNotification::CommitFilter)
172 {
173 log::error!("send error: {e}");
174 }
175 }
176 _ => (),
177 }
178 }
179}
180
181impl AsyncJob for AsyncCommitFilterJob {
182 type Notification = AsyncGitNotification;
183 type Progress = ProgressPercent;
184
185 fn run(
186 &mut self,
187 params: RunParams<Self::Notification, Self::Progress>,
188 ) -> Result<Self::Notification> {
189 if let Ok(mut state) = self.state.lock() {
190 *state = state.take().map(|state| match state {
191 JobState::Request { commits, repo_path } => {
192 self.run_request(&repo_path, commits, ¶ms)
193 }
194 JobState::Response(result) => {
195 JobState::Response(result)
196 }
197 });
198 }
199
200 Ok(AsyncGitNotification::CommitFilter)
201 }
202}