gnostr_asyncgit/
filter_commits.rs

1use std::{
2    sync::{
3        atomic::{AtomicBool, AtomicUsize, Ordering},
4        Arc, Mutex,
5    },
6    time::{Duration, Instant},
7};
8
9use rayon::{
10    prelude::ParallelIterator,
11    slice::{ParallelSlice, ParallelSliceMut},
12};
13
14use crate::{
15    asyncjob::{AsyncJob, RunParams},
16    error::Result,
17    sync::{self, CommitId, RepoPath, SharedCommitFilterFn},
18    AsyncGitNotification, ProgressPercent,
19};
20
21///
22pub struct CommitFilterResult {
23    ///
24    pub result: Vec<CommitId>,
25    ///
26    pub duration: Duration,
27}
28
29enum JobState {
30    Request {
31        commits: Vec<CommitId>,
32        repo_path: RepoPath,
33    },
34    Response(Result<CommitFilterResult>),
35}
36
37///
38#[derive(Clone)]
39pub struct AsyncCommitFilterJob {
40    state: Arc<Mutex<Option<JobState>>>,
41    filter: SharedCommitFilterFn,
42    cancellation_flag: Arc<AtomicBool>,
43}
44
45///
46impl AsyncCommitFilterJob {
47    ///
48    pub fn new(
49        repo_path: RepoPath,
50        commits: Vec<CommitId>,
51        filter: SharedCommitFilterFn,
52        cancellation_flag: Arc<AtomicBool>,
53    ) -> Self {
54        Self {
55            state: Arc::new(Mutex::new(Some(JobState::Request { repo_path, commits }))),
56            filter,
57            cancellation_flag,
58        }
59    }
60
61    ///
62    pub fn result(&self) -> Option<Result<CommitFilterResult>> {
63        if let Ok(mut state) = self.state.lock() {
64            if let Some(state) = state.take() {
65                return match state {
66                    JobState::Request { .. } => None,
67                    JobState::Response(result) => Some(result),
68                };
69            }
70        }
71
72        None
73    }
74
75    fn run_request(
76        &self,
77        repo_path: &RepoPath,
78        commits: Vec<CommitId>,
79        params: &RunParams<AsyncGitNotification, ProgressPercent>,
80    ) -> JobState {
81        let result = self
82            .filter_commits(repo_path, commits, params)
83            .map(|(start, result)| CommitFilterResult {
84                result,
85                duration: start.elapsed(),
86            });
87
88        JobState::Response(result)
89    }
90
91    fn filter_commits(
92        &self,
93        repo_path: &RepoPath,
94        commits: Vec<CommitId>,
95        params: &RunParams<AsyncGitNotification, ProgressPercent>,
96    ) -> Result<(Instant, Vec<CommitId>)> {
97        scopetime::scope_time!("filter_commits");
98
99        let total_amount = commits.len();
100        let start = Instant::now();
101
102        //note: for some reason >4 threads degrades search
103        // performance
104        let pool = rayon::ThreadPoolBuilder::new().num_threads(4).build()?;
105
106        let idx = AtomicUsize::new(0);
107
108        let mut result = pool.install(|| {
109            commits
110                .into_iter()
111                .enumerate()
112                .collect::<Vec<(usize, CommitId)>>()
113                .par_chunks(1000)
114                .filter_map(|c| {
115                    //TODO: error log repo open errors
116                    sync::repo(repo_path).ok().map(|repo| {
117                        c.iter()
118                            .filter_map(|(e, c)| {
119                                let idx = idx.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
120
121                                if self.cancellation_flag.load(Ordering::Relaxed) {
122                                    return None;
123                                }
124
125                                Self::update_progress(
126                                    params,
127                                    ProgressPercent::new(idx, total_amount),
128                                );
129
130                                (*self.filter)(&repo, c)
131                                    .ok()
132                                    .and_then(|res| res.then_some((*e, *c)))
133                            })
134                            .collect::<Vec<_>>()
135                    })
136                })
137                .flatten()
138                .collect::<Vec<_>>()
139        });
140
141        result.par_sort_by(|a, b| a.0.cmp(&b.0));
142
143        let result = result.into_iter().map(|c| c.1).collect();
144
145        Ok((start, result))
146    }
147
148    fn update_progress(
149        params: &RunParams<AsyncGitNotification, ProgressPercent>,
150        new_progress: ProgressPercent,
151    ) {
152        match params.set_progress(new_progress) {
153            Err(e) => log::error!("progress error: {e}"),
154            Ok(result) if result => {
155                if let Err(e) = params.send(AsyncGitNotification::CommitFilter) {
156                    log::error!("send error: {e}");
157                }
158            }
159            _ => (),
160        }
161    }
162}
163
164impl AsyncJob for AsyncCommitFilterJob {
165    type Notification = AsyncGitNotification;
166    type Progress = ProgressPercent;
167
168    fn run(
169        &mut self,
170        params: RunParams<Self::Notification, Self::Progress>,
171    ) -> Result<Self::Notification> {
172        if let Ok(mut state) = self.state.lock() {
173            *state = state.take().map(|state| match state {
174                JobState::Request { commits, repo_path } => {
175                    self.run_request(&repo_path, commits, &params)
176                }
177                JobState::Response(result) => JobState::Response(result),
178            });
179        }
180
181        Ok(AsyncGitNotification::CommitFilter)
182    }
183}