1use crate::{config::RepositoryConfig, repo::RepositoryFactory};
2use ahash::RandomState;
3use async_trait::async_trait;
4use crossbeam_channel::bounded;
5use dashmap::DashMap;
6use futures_util::future::join_all;
7use indicatif::MultiProgress;
8use rayon::ThreadPoolBuilder;
9use repo::Repository;
10use reqwest::{Client, Url};
11use scraper::{Html, Selector};
12use std::{
13 sync::{
14 atomic::{AtomicUsize, Ordering},
15 Arc, Mutex,
16 },
17 time::Instant,
18};
19use tracing::error;
20use types::RepoName;
21
22pub mod config;
23pub mod log;
24pub mod org;
25pub mod repo;
26pub mod types;
27pub mod user;
28
29const NUMBER_OF_REPO_PER_PAGE: usize = 30;
30
31#[async_trait]
32pub trait Factory {
33 async fn _repositories_count(client: &Client, url: Url) -> usize;
34
35 fn _pages_count(repo_count: usize) -> usize {
36 let modulo = repo_count % NUMBER_OF_REPO_PER_PAGE;
37 if modulo.eq(&0) {
38 repo_count / NUMBER_OF_REPO_PER_PAGE
39 } else {
40 ((repo_count - modulo) / NUMBER_OF_REPO_PER_PAGE) + 1
41 }
42 }
43
44 fn _build_repo_links(page_url: Url, repo_count: usize, pages_count: usize) -> Vec<Url> {
45 let mut pages_urls = Vec::with_capacity(repo_count);
46 for i in 1..pages_count + 1 {
47 let url = format!("{}&page={}", page_url, i);
48 pages_urls.push(Url::parse(&url).unwrap());
49 }
50
51 pages_urls
52 }
53}
54
55#[async_trait]
56pub trait ExtractLog {
57 async fn common_log_feature(
58 &self,
59 client: &Client,
60 selector: Selector,
61 ) -> DashMap<RepoName, Repository, ahash::RandomState> {
62 let start_a = Instant::now();
63
64 let repo_count = self.get_repo_count();
65 let all_branches = self.get_all_branches();
66 let pages_urls = self.get_pages_url();
67 let url = self.get_url();
68
69 let (tx_url, rx_url) = bounded(repo_count);
70 let mut tokio_handles = Vec::with_capacity(pages_urls.len());
71 for page in pages_urls {
72 let client = client.clone();
73 let url = url.clone();
74 let tx_url = tx_url.clone();
75 let selector = selector.clone();
76
77 let handle = tokio::spawn(async move {
78 let resp = client.get(page).send().await.unwrap();
79 let text = resp.text().await.unwrap();
80 let parser = Html::parse_document(&text);
81
82 parser
83 .select(&selector)
84 .map(|link| {
85 let endpoint_url = link.value().attr("href").unwrap().to_string();
86 let repo_name = endpoint_url.split('/').last().unwrap();
87 let repo_url = format!("{}{}/", url, repo_name);
88 let sending = tx_url.send(Url::parse(&repo_url).unwrap());
89 match sending {
90 Ok(_) => tracing::debug!("Send url {}", repo_url),
91 Err(e) => {
92 error!("Failed to send {} with : [{:?}]", repo_url, e)
93 }
94 }
95 })
96 .for_each(drop);
97 });
98
99 tokio_handles.push(handle);
100 }
101 drop(tx_url);
102
103 let mut queue_handles = Vec::with_capacity(repo_count);
104 let (tx, rx) = bounded(repo_count);
105 let mpb: Arc<Mutex<MultiProgress>> = Arc::new(Mutex::new(MultiProgress::new()));
106
107 for i in 0..repo_count {
108 let tx = tx.clone();
109 let rx_url = rx_url.clone();
110 let mpb = mpb.clone();
111
112 let handle = rayon::spawn(move || {
113 let clonable_url = rx_url.recv().unwrap();
114 let repo_config = RepositoryConfig::new(clonable_url, all_branches);
115
116 let repo = RepositoryFactory::with_config(repo_config).create(mpb);
117
118 tx.send(repo).unwrap();
119 drop(tx);
120 });
121
122 queue_handles.push(handle)
123 }
124 drop(tx);
125 drop(rx_url);
126
127 tracing::info!("Number of threads : {}", rayon::current_num_threads());
128 let current_num_thread = rayon::current_num_threads() - 2;
129 let pool = ThreadPoolBuilder::new()
130 .num_threads(current_num_thread)
131 .build()
132 .unwrap();
133
134 let dash: Arc<DashMap<RepoName, Repository, RandomState>> = Arc::new(
135 DashMap::with_capacity_and_hasher(repo_count, RandomState::new()),
136 );
137 let atomic_count = Arc::new(AtomicUsize::new(0));
138
139 let dash_result: Arc<DashMap<RepoName, Repository, RandomState>> =
140 pool.scope(move |scope| {
141 for _ in 0..repo_count {
142 let dash = dash.clone();
143 let rx = rx.clone();
144 let atomic_count = atomic_count.clone();
145
146 scope.spawn(move |_| {
147 let repo = rx.recv().unwrap();
148 drop(rx);
149
150 let data = repo.clone().extract_log();
151 let repo_name_key = RepoName(repo.name.clone());
152 dash.insert(repo_name_key, data);
153 atomic_count.fetch_add(1, Ordering::Relaxed);
154 })
161 }
162 dash
163 });
164 drop(pool);
165
166 join_all(tokio_handles).await;
167
168 tracing::info!(
169 "Fetching and Cloning handled in {:?} for {}",
170 start_a.elapsed(),
171 repo_count
172 );
173
174 Arc::try_unwrap(dash_result).unwrap()
175 }
176
177 async fn extract_log(mut self, client: &Client) -> Self;
178
179 fn get_repo_count(&self) -> usize;
181 fn get_all_branches(&self) -> bool;
182 fn get_url(&self) -> Url;
183 fn get_pages_url(&self) -> Vec<Url>;
184}
185
186pub struct Logger;
187impl Logger {
188 pub async fn log_for<T: ExtractLog>(t: T, client: &Client) -> T {
189 t.extract_log(client).await
190 }
191}