glit_core/
lib.rs

1use crate::{config::RepositoryConfig, repo::RepositoryFactory};
2use ahash::RandomState;
3use async_trait::async_trait;
4use crossbeam_channel::bounded;
5use dashmap::DashMap;
6use futures_util::future::join_all;
7use indicatif::MultiProgress;
8use rayon::ThreadPoolBuilder;
9use repo::Repository;
10use reqwest::{Client, Url};
11use scraper::{Html, Selector};
12use std::{
13    sync::{
14        atomic::{AtomicUsize, Ordering},
15        Arc, Mutex,
16    },
17    time::Instant,
18};
19use tracing::error;
20use types::RepoName;
21
22pub mod config;
23pub mod log;
24pub mod org;
25pub mod repo;
26pub mod types;
27pub mod user;
28
29const NUMBER_OF_REPO_PER_PAGE: usize = 30;
30
31#[async_trait]
32pub trait Factory {
33    async fn _repositories_count(client: &Client, url: Url) -> usize;
34
35    fn _pages_count(repo_count: usize) -> usize {
36        let modulo = repo_count % NUMBER_OF_REPO_PER_PAGE;
37        if modulo.eq(&0) {
38            repo_count / NUMBER_OF_REPO_PER_PAGE
39        } else {
40            ((repo_count - modulo) / NUMBER_OF_REPO_PER_PAGE) + 1
41        }
42    }
43
44    fn _build_repo_links(page_url: Url, repo_count: usize, pages_count: usize) -> Vec<Url> {
45        let mut pages_urls = Vec::with_capacity(repo_count);
46        for i in 1..pages_count + 1 {
47            let url = format!("{}&page={}", page_url, i);
48            pages_urls.push(Url::parse(&url).unwrap());
49        }
50
51        pages_urls
52    }
53}
54
55#[async_trait]
56pub trait ExtractLog {
57    async fn common_log_feature(
58        &self,
59        client: &Client,
60        selector: Selector,
61    ) -> DashMap<RepoName, Repository, ahash::RandomState> {
62        let start_a = Instant::now();
63
64        let repo_count = self.get_repo_count();
65        let all_branches = self.get_all_branches();
66        let pages_urls = self.get_pages_url();
67        let url = self.get_url();
68
69        let (tx_url, rx_url) = bounded(repo_count);
70        let mut tokio_handles = Vec::with_capacity(pages_urls.len());
71        for page in pages_urls {
72            let client = client.clone();
73            let url = url.clone();
74            let tx_url = tx_url.clone();
75            let selector = selector.clone();
76
77            let handle = tokio::spawn(async move {
78                let resp = client.get(page).send().await.unwrap();
79                let text = resp.text().await.unwrap();
80                let parser = Html::parse_document(&text);
81
82                parser
83                    .select(&selector)
84                    .map(|link| {
85                        let endpoint_url = link.value().attr("href").unwrap().to_string();
86                        let repo_name = endpoint_url.split('/').last().unwrap();
87                        let repo_url = format!("{}{}/", url, repo_name);
88                        let sending = tx_url.send(Url::parse(&repo_url).unwrap());
89                        match sending {
90                            Ok(_) => tracing::debug!("Send url {}", repo_url),
91                            Err(e) => {
92                                error!("Failed to send {} with : [{:?}]", repo_url, e)
93                            }
94                        }
95                    })
96                    .for_each(drop);
97            });
98
99            tokio_handles.push(handle);
100        }
101        drop(tx_url);
102
103        let mut queue_handles = Vec::with_capacity(repo_count);
104        let (tx, rx) = bounded(repo_count);
105        let mpb: Arc<Mutex<MultiProgress>> = Arc::new(Mutex::new(MultiProgress::new()));
106
107        for i in 0..repo_count {
108            let tx = tx.clone();
109            let rx_url = rx_url.clone();
110            let mpb = mpb.clone();
111
112            let handle = rayon::spawn(move || {
113                let clonable_url = rx_url.recv().unwrap();
114                let repo_config = RepositoryConfig::new(clonable_url, all_branches);
115
116                let repo = RepositoryFactory::with_config(repo_config).create(mpb);
117
118                tx.send(repo).unwrap();
119                drop(tx);
120            });
121
122            queue_handles.push(handle)
123        }
124        drop(tx);
125        drop(rx_url);
126
127        tracing::info!("Number of threads : {}", rayon::current_num_threads());
128        let current_num_thread = rayon::current_num_threads() - 2;
129        let pool = ThreadPoolBuilder::new()
130            .num_threads(current_num_thread)
131            .build()
132            .unwrap();
133
134        let dash: Arc<DashMap<RepoName, Repository, RandomState>> = Arc::new(
135            DashMap::with_capacity_and_hasher(repo_count, RandomState::new()),
136        );
137        let atomic_count = Arc::new(AtomicUsize::new(0));
138
139        let dash_result: Arc<DashMap<RepoName, Repository, RandomState>> =
140            pool.scope(move |scope| {
141                for _ in 0..repo_count {
142                    let dash = dash.clone();
143                    let rx = rx.clone();
144                    let atomic_count = atomic_count.clone();
145
146                    scope.spawn(move |_| {
147                        let repo = rx.recv().unwrap();
148                        drop(rx);
149
150                        let data = repo.clone().extract_log();
151                        let repo_name_key = RepoName(repo.name.clone());
152                        dash.insert(repo_name_key, data);
153                        atomic_count.fetch_add(1, Ordering::Relaxed);
154                        //println!(
155                        //    "Repository {} handled : {}/{}",
156                        //    repo.name,
157                        //    atomic_count.load(Ordering::Relaxed),
158                        //    repo_count
159                        //);
160                    })
161                }
162                dash
163            });
164        drop(pool);
165
166        join_all(tokio_handles).await;
167
168        tracing::info!(
169            "Fetching and Cloning handled in {:?} for {}",
170            start_a.elapsed(),
171            repo_count
172        );
173
174        Arc::try_unwrap(dash_result).unwrap()
175    }
176
177    async fn extract_log(mut self, client: &Client) -> Self;
178
179    // Common Getter
180    fn get_repo_count(&self) -> usize;
181    fn get_all_branches(&self) -> bool;
182    fn get_url(&self) -> Url;
183    fn get_pages_url(&self) -> Vec<Url>;
184}
185
186pub struct Logger;
187impl Logger {
188    pub async fn log_for<T: ExtractLog>(t: T, client: &Client) -> T {
189        t.extract_log(client).await
190    }
191}