1use std::{collections::HashSet, path::PathBuf, sync::Arc};
2
3use anyhow::Context;
4use dashmap::DashSet;
5use futures::{stream, StreamExt};
6use tokio::sync::mpsc;
7use tokio_stream::wrappers::UnboundedReceiverStream;
8use tracing::{info_span, Instrument};
9
10use crate::{data, git, os};
11
12#[derive(clap::Args, Debug, Clone)]
13pub struct Cmd {
14 #[clap(short, long, default_value = "git-tracker.db")]
16 db_file: PathBuf,
17
18 #[clap(short, long, default_value_t = 1000)]
20 batch_size: usize,
21
22 #[clap(short, long, default_value_t = false)]
24 follow: bool,
25
26 #[clap(short, long)]
29 ignore_paths: Vec<PathBuf>,
30
31 search_paths: Vec<PathBuf>,
33}
34
35impl Cmd {
36 pub async fn run(&self) -> anyhow::Result<()> {
37 let ignore_paths: HashSet<PathBuf> =
38 self.ignore_paths.iter().cloned().collect();
39 let mut search_paths = Vec::new();
40 for path in &self.search_paths {
41 let path = path
42 .canonicalize()
43 .context(format!("Invalid local path={path:?}"))?;
44 search_paths.push(path);
45 }
46 let locals: Arc<DashSet<data::Link>> = Arc::new(DashSet::new());
47 let remotes_ok: Arc<DashSet<data::Link>> = Arc::new(DashSet::new());
48 let remotes_err: Arc<DashSet<data::Link>> = Arc::new(DashSet::new());
49
50 let host = os::hostname().await?;
51
52 let (urls_tx, urls_rx) = mpsc::unbounded_channel();
53 let (views_tx, views_rx) = mpsc::unbounded_channel();
54 let storage = data::Storage::connect(&self.db_file).await?;
55 let storage = Arc::new(storage);
56
57 let locals_worker = tokio::spawn(
58 {
59 let host = host.clone();
60 let follow = self.follow;
61 let locals = locals.clone();
62 let views_tx = views_tx.clone();
63 async move {
64 let git_dirs = search_paths.iter().flat_map(|path| {
65 crate::fs::find_dirs(
66 path,
67 ".git",
68 follow,
69 &ignore_paths,
70 )
71 });
72 let unique: DashSet<String> = DashSet::new();
73 stream::iter(git_dirs)
75 .for_each_concurrent(None, |dir| async {
76 if git::is_repo(&dir).await {
77 let link = data::Link::Fs { dir };
78 let view = git::view(&host, &link).await;
79 locals.insert(link);
80 for url in view.repo.iter().flat_map(|repo| {
81 repo.remotes.values().cloned()
82 }) {
83 if unique.insert(url.clone()) {
84 urls_tx.send(url).unwrap_or_else(
85 |_| {
86 unreachable!(
87 "urls_rx dropped while \
88 urls_tx is still in use"
89 )
90 },
91 );
92 }
93 }
94 views_tx.send(view).unwrap_or_else(|_| {
95 unreachable!(
96 "view_rx dropped while view_tx \
97 is still in use"
98 )
99 });
100 }
101 })
102 .await;
103 }
104 }
105 .instrument(info_span!("locals_worker"))
106 .in_current_span(),
107 );
108
109 let remotes_worker = tokio::spawn(
110 {
111 let views_tx = views_tx.clone();
112 let remotes_ok = remotes_ok.clone();
113 let remotes_err = remotes_err.clone();
114 async move {
115 UnboundedReceiverStream::new(urls_rx)
116 .for_each_concurrent(None, {
117 move |url: String| {
118 let host = host.clone();
119 let remotes_ok = remotes_ok.clone();
120 let remotes_err = remotes_err.clone();
121 let views_tx = views_tx.clone();
122 async move {
123 let link = data::Link::Net { url };
124 let view = git::view(&host, &link).await;
125 if view.repo.is_some() {
126 remotes_ok.insert(link);
127 } else {
128 remotes_err.insert(link);
129 }
130 views_tx.send(view).unwrap_or_else(
131 |_| {
132 unreachable!(
133 "view_rx dropped while view_tx \
134 is still in use"
135 )
136 },
137 );
138 }
139 }
140 })
141 .await;
142 }
143 }
144 .instrument(info_span!("remotes_worker"))
145 .in_current_span(),
146 );
147
148 let batch_size = self.batch_size;
149 let storage_worker = tokio::spawn(
150 async move {
151 UnboundedReceiverStream::new(views_rx)
152 .chunks(batch_size)
153 .for_each_concurrent(None, move |views| {
154 let storage = storage.clone();
155 async move {
156 match storage.store_views(&views[..]).await {
157 Ok(()) => {
158 tracing::info!("Views store succeeded.");
159 }
160 Err(error) => {
161 tracing::error!(
163 ?error,
164 "Views store failed."
165 );
166 }
167 }
168 }
169 })
170 .await;
171 }
172 .instrument(info_span!("storage_worker"))
173 .in_current_span(),
174 );
175
176 let _ = locals_worker.await;
177 let _ = remotes_worker.await;
178 drop(views_tx); let _ = storage_worker.await;
180
181 tracing::info!(
182 locals = locals.len(),
183 remotes_ok = remotes_ok.len(),
184 remotes_err = remotes_err.len(),
185 "Final counts."
186 );
187 Ok(())
188 }
189}