1use chrono::{DateTime, Utc};
3use color_eyre::eyre::{eyre, Result};
4use mktemp::Temp;
5use std::thread::sleep;
6use std::time::Duration;
7use std::{collections::HashMap, path::PathBuf};
8
9use tracing::{debug, error, info};
10
11use git_meta::{GitCommitMeta, GitCredentials, GitRepo};
12
13type BranchHeads = HashMap<String, GitCommitMeta>;
14type PathAlert = HashMap<String, Vec<PathBuf>>;
15
16#[derive(Clone, Debug)]
17pub struct GitRepoWatchHandler {
18 pub repo: GitRepo,
19 pub state: Option<GitRepoState>,
20 pub branch_filter: Option<Vec<String>>,
21 pub path_filter: Option<Vec<PathBuf>>,
22 pub use_shallow: bool,
23 pub poll_freq: Duration,
24}
25
26#[derive(Clone, Debug, Default, PartialEq)]
27pub struct GitRepoState {
28 pub last_updated: Option<DateTime<Utc>>,
29 pub branch_heads: BranchHeads,
30 pub path_alert: PathAlert,
31}
32
33impl GitRepoWatchHandler {
38 pub fn new<U: AsRef<str>>(url: U) -> Result<Self> {
39 Ok(GitRepoWatchHandler {
40 repo: GitRepo::new(url)?,
41 state: None,
42 branch_filter: None,
43 path_filter: None,
44 use_shallow: false,
45 poll_freq: Duration::from_secs(5),
46 })
47 }
48
49 pub fn with_branch(mut self, branch: Option<String>) -> Self {
57 self.repo = self.repo.with_branch(branch);
58 self
59 }
60
61 pub fn with_commit(mut self, id: Option<String>) -> Result<Self> {
65 let tempdir = Temp::new_dir()?;
67
68 let _clone = self.repo.to_clone().git_clone(tempdir.to_path_buf())?;
70 let repo = GitRepo::open(tempdir.to_path_buf(), self.repo.branch.clone(), id.clone())?;
71
72 let repo_head = self
73 .repo
74 .head
75 .clone()
76 .ok_or_else(|| eyre!("Repo HEAD commit is not set"))?;
77 let repo_branch = self
78 .repo
79 .branch
80 .clone()
81 .ok_or_else(|| eyre!("Repo branch is not set"))?;
82
83 let mut path_alert: HashMap<String, Vec<PathBuf>> = HashMap::new();
86 let changed_paths = repo.to_info().list_files_changed_at(repo_head.id.clone())?;
88
89 if let Some(paths) = changed_paths {
93 path_alert.insert(repo_branch.clone(), paths);
94 }
95
96 self.repo = repo;
98
99 let mut head = HashMap::new();
100 head.insert(
101 repo_branch,
102 GitCommitMeta {
103 id: repo_head.id.clone(),
104 message: repo_head.message,
105 timestamp: repo_head.timestamp,
106 },
107 );
108
109 let repo_report = GitRepoState {
110 branch_heads: head,
111 last_updated: Some(Utc::now()),
112 path_alert,
113 };
114
115 self.state = Some(repo_report);
116 self.repo = self.repo.with_commit(id)?;
117 Ok(self)
118 }
119
120 pub fn with_credentials(mut self, creds: Option<GitCredentials>) -> Self {
121 self.repo.credentials = creds;
122 self
123 }
124
125 pub fn with_branch_filter(mut self, branch_list: Option<Vec<String>>) -> Self {
126 self.branch_filter = branch_list;
127 self
128 }
129
130 pub fn with_path_filter(mut self, path_list: Option<Vec<PathBuf>>) -> Self {
131 self.path_filter = path_list;
132 self
133 }
134
135 #[cfg(feature = "shallow_clone")]
136 pub fn with_shallow_clone(mut self, shallow_choice: bool) -> Self {
137 self.use_shallow = shallow_choice;
138 self
139 }
140
141 pub fn with_poll_freq(mut self, frequency: Duration) -> Self {
142 self.poll_freq = frequency;
143 self
144 }
145
146 pub fn state(&self) -> Option<GitRepoState> {
153 self.state.clone()
154 }
155
156 fn _update_state(&mut self) -> Result<GitRepoState> {
157 let prev_state = self.clone();
158
159 let temp_path = Temp::new_dir()?;
161
162 self.repo = if cfg!(feature = "shallow_clone") {
163 match &self.use_shallow {
164 true => {
165 debug!("Shallow clone");
166 self.repo
167 .to_clone()
168 .git_clone_shallow(&temp_path.as_path())?
169 }
170 false => {
171 debug!("Deep clone");
172 self.repo.to_clone().git_clone(&temp_path.as_path())?
173 }
174 }
175 } else {
176 debug!("Deep clone");
177 self.repo.to_clone().git_clone(&temp_path.as_path())?
178 };
179
180 let mut repo_report = GitRepoState::default();
190
191 let branch_heads = self
194 .repo
195 .clone()
196 .to_info()
197 .get_remote_branch_head_refs(self.branch_filter.clone())?;
198
199 repo_report.branch_heads = branch_heads.clone();
200
201 let mut path_alert = HashMap::new();
203
204 for (branch, commit) in branch_heads {
206 if let Some(c) = prev_state
208 .state()
209 .unwrap_or_default()
210 .branch_heads
211 .get(&branch)
212 {
213 let paths = self
214 .repo
215 .to_info()
216 .list_files_changed_between(commit.id, c.clone().id)?;
217 if let Some(p) = paths {
218 path_alert.insert(branch, p);
219 } else {
220 error!("There are no ")
221 }
222 } else {
226 let paths = self.repo.to_info().list_files_changed_at(commit.id)?;
227 if let Some(p) = paths {
228 path_alert.insert(branch, p);
229 }
230 };
231 }
232
233 repo_report.path_alert = path_alert;
234 repo_report.last_updated = Some(Utc::now());
235
236 self.state = Some(repo_report.clone());
239
240 Ok(repo_report)
241 }
242
243 pub async fn update_state(&mut self) -> Result<GitRepoState> {
245 self._update_state()
246 }
247
248 pub fn update_state_sync(&mut self) -> Result<GitRepoState> {
250 self._update_state()
251 }
252
253 pub async fn watch_new_commits(
254 &mut self,
255 pre_run: bool,
256 closure: impl Fn(GitRepoState),
257 ) -> Result<()> {
258 let mut branch_heads_state = self.update_state().await?;
259
260 if pre_run {
261 closure(branch_heads_state.clone());
262 }
263
264 loop {
266 sleep(self.poll_freq);
267
268 let snapshot = self.get_branches_snapshot()?;
269 branch_heads_state = self.update_state().await?;
270
271 self.run_code_if_new_commit_in_branch(branch_heads_state, snapshot.clone(), &closure)?;
273 }
274 }
275
276 fn run_code_if_new_commit_in_branch(
277 &self,
278 current_state: GitRepoState,
279 current_commits: HashMap<String, GitCommitMeta>,
280 closure: impl Fn(GitRepoState) + Copy,
281 ) -> Result<bool> {
282 for (branch, commit) in current_commits {
283 match current_state.branch_heads.get(&branch) {
284 Some(c) => {
285 if &commit == c {
286 info!("No new commits in branch {} found", branch);
287 } else {
288 info!("New commit in branch {} found", branch);
289
290 if let Some(state) = self.state() {
291 closure(state);
292 } else {
293 return Err(eyre!("No state found"));
294 }
295 }
296 }
297 None => {
298 info!("New branch '{}' found", branch);
299 if let Some(state) = self.state() {
300 closure(state);
301 } else {
302 return Err(eyre!("No state found"));
303 }
304 }
305 }
306 }
307 Ok(true)
308 }
309 pub fn watch_new_commits_sync(
310 &mut self,
311 pre_run: bool,
312 closure: impl Fn(GitRepoState),
313 ) -> Result<()> {
314 if pre_run {
315 if let Some(state) = self.state() {
316 closure(state);
317 } else {
318 return Err(eyre!("No state found"));
319 }
320 }
321
322 loop {
323 sleep(self.poll_freq);
324
325 let snapshot = self.get_branches_snapshot()?;
326 let branch_heads_state = self.update_state_sync()?;
327
328 self.run_code_if_new_commit_in_branch(branch_heads_state, snapshot.clone(), &closure)?;
330 }
331 }
332
333 fn get_branches_snapshot(&self) -> Result<HashMap<String, GitCommitMeta>> {
334 if let Some(state) = self.state.clone() {
335 Ok(state.branch_heads)
336 } else {
337 Err(eyre!("Unable to get snapshot of branch HEAD refs"))
338 }
339 }
340}