gnostr_asyncgit/
filter_commits.rs1use std::{
2 sync::{
3 atomic::{AtomicBool, AtomicUsize, Ordering},
4 Arc, Mutex,
5 },
6 time::{Duration, Instant},
7};
8
9use rayon::{
10 prelude::ParallelIterator,
11 slice::{ParallelSlice, ParallelSliceMut},
12};
13
14use crate::{
15 asyncjob::{AsyncJob, RunParams},
16 error::Result,
17 sync::{self, CommitId, RepoPath, SharedCommitFilterFn},
18 AsyncGitNotification, ProgressPercent,
19};
20
21pub struct CommitFilterResult {
23 pub result: Vec<CommitId>,
25 pub duration: Duration,
27}
28
29enum JobState {
30 Request {
31 commits: Vec<CommitId>,
32 repo_path: RepoPath,
33 },
34 Response(Result<CommitFilterResult>),
35}
36
37#[derive(Clone)]
39pub struct AsyncCommitFilterJob {
40 state: Arc<Mutex<Option<JobState>>>,
41 filter: SharedCommitFilterFn,
42 cancellation_flag: Arc<AtomicBool>,
43}
44
45impl AsyncCommitFilterJob {
47 pub fn new(
49 repo_path: RepoPath,
50 commits: Vec<CommitId>,
51 filter: SharedCommitFilterFn,
52 cancellation_flag: Arc<AtomicBool>,
53 ) -> Self {
54 Self {
55 state: Arc::new(Mutex::new(Some(JobState::Request { repo_path, commits }))),
56 filter,
57 cancellation_flag,
58 }
59 }
60
61 pub fn result(&self) -> Option<Result<CommitFilterResult>> {
63 if let Ok(mut state) = self.state.lock() {
64 if let Some(state) = state.take() {
65 return match state {
66 JobState::Request { .. } => None,
67 JobState::Response(result) => Some(result),
68 };
69 }
70 }
71
72 None
73 }
74
75 fn run_request(
76 &self,
77 repo_path: &RepoPath,
78 commits: Vec<CommitId>,
79 params: &RunParams<AsyncGitNotification, ProgressPercent>,
80 ) -> JobState {
81 let result = self
82 .filter_commits(repo_path, commits, params)
83 .map(|(start, result)| CommitFilterResult {
84 result,
85 duration: start.elapsed(),
86 });
87
88 JobState::Response(result)
89 }
90
91 fn filter_commits(
92 &self,
93 repo_path: &RepoPath,
94 commits: Vec<CommitId>,
95 params: &RunParams<AsyncGitNotification, ProgressPercent>,
96 ) -> Result<(Instant, Vec<CommitId>)> {
97 scopetime::scope_time!("filter_commits");
98
99 let total_amount = commits.len();
100 let start = Instant::now();
101
102 let pool = rayon::ThreadPoolBuilder::new().num_threads(4).build()?;
105
106 let idx = AtomicUsize::new(0);
107
108 let mut result = pool.install(|| {
109 commits
110 .into_iter()
111 .enumerate()
112 .collect::<Vec<(usize, CommitId)>>()
113 .par_chunks(1000)
114 .filter_map(|c| {
115 sync::repo(repo_path).ok().map(|repo| {
117 c.iter()
118 .filter_map(|(e, c)| {
119 let idx = idx.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
120
121 if self.cancellation_flag.load(Ordering::Relaxed) {
122 return None;
123 }
124
125 Self::update_progress(
126 params,
127 ProgressPercent::new(idx, total_amount),
128 );
129
130 (*self.filter)(&repo, c)
131 .ok()
132 .and_then(|res| res.then_some((*e, *c)))
133 })
134 .collect::<Vec<_>>()
135 })
136 })
137 .flatten()
138 .collect::<Vec<_>>()
139 });
140
141 result.par_sort_by(|a, b| a.0.cmp(&b.0));
142
143 let result = result.into_iter().map(|c| c.1).collect();
144
145 Ok((start, result))
146 }
147
148 fn update_progress(
149 params: &RunParams<AsyncGitNotification, ProgressPercent>,
150 new_progress: ProgressPercent,
151 ) {
152 match params.set_progress(new_progress) {
153 Err(e) => log::error!("progress error: {e}"),
154 Ok(result) if result => {
155 if let Err(e) = params.send(AsyncGitNotification::CommitFilter) {
156 log::error!("send error: {e}");
157 }
158 }
159 _ => (),
160 }
161 }
162}
163
164impl AsyncJob for AsyncCommitFilterJob {
165 type Notification = AsyncGitNotification;
166 type Progress = ProgressPercent;
167
168 fn run(
169 &mut self,
170 params: RunParams<Self::Notification, Self::Progress>,
171 ) -> Result<Self::Notification> {
172 if let Ok(mut state) = self.state.lock() {
173 *state = state.take().map(|state| match state {
174 JobState::Request { commits, repo_path } => {
175 self.run_request(&repo_path, commits, ¶ms)
176 }
177 JobState::Response(result) => JobState::Response(result),
178 });
179 }
180
181 Ok(AsyncGitNotification::CommitFilter)
182 }
183}