git_tracker/cmd/
find.rs

1use std::{collections::HashSet, path::PathBuf, sync::Arc};
2
3use anyhow::Context;
4use dashmap::DashSet;
5use futures::{stream, StreamExt};
6use tokio::sync::mpsc;
7use tokio_stream::wrappers::UnboundedReceiverStream;
8use tracing::{info_span, Instrument};
9
10use crate::{data, git, os};
11
12#[derive(clap::Args, Debug, Clone)]
13pub struct Cmd {
14    /// Database file.
15    #[clap(short, long, default_value = "git-tracker.db")]
16    db_file: PathBuf,
17
18    /// Database insertion batch size - how may views to pack into a single transaction.
19    #[clap(short, long, default_value_t = 1000)]
20    batch_size: usize,
21
22    /// Follow symbollic links.
23    #[clap(short, long, default_value_t = false)]
24    follow: bool,
25
26    // TODO These should actualy be regexp patterns to filter candidate paths.
27    /// Ignore this path when searching for repos.
28    #[clap(short, long)]
29    ignore_paths: Vec<PathBuf>,
30
31    /// Local paths to explore for potential git repos.
32    search_paths: Vec<PathBuf>,
33}
34
35impl Cmd {
36    pub async fn run(&self) -> anyhow::Result<()> {
37        let ignore_paths: HashSet<PathBuf> =
38            self.ignore_paths.iter().cloned().collect();
39        let mut search_paths = Vec::new();
40        for path in &self.search_paths {
41            let path = path
42                .canonicalize()
43                .context(format!("Invalid local path={path:?}"))?;
44            search_paths.push(path);
45        }
46        let locals: Arc<DashSet<data::Link>> = Arc::new(DashSet::new());
47        let remotes_ok: Arc<DashSet<data::Link>> = Arc::new(DashSet::new());
48        let remotes_err: Arc<DashSet<data::Link>> = Arc::new(DashSet::new());
49
50        let host = os::hostname().await?;
51
52        let (urls_tx, urls_rx) = mpsc::unbounded_channel();
53        let (views_tx, views_rx) = mpsc::unbounded_channel();
54        let storage = data::Storage::connect(&self.db_file).await?;
55        let storage = Arc::new(storage);
56
57        let locals_worker = tokio::spawn(
58            {
59                let host = host.clone();
60                let follow = self.follow;
61                let locals = locals.clone();
62                let views_tx = views_tx.clone();
63                async move {
64                    let git_dirs = search_paths.iter().flat_map(|path| {
65                        crate::fs::find_dirs(
66                            path,
67                            ".git",
68                            follow,
69                            &ignore_paths,
70                        )
71                    });
72                    let unique: DashSet<String> = DashSet::new();
73                    // XXX This has been the fastest combination: sync producer + async consumer.
74                    stream::iter(git_dirs)
75                        .for_each_concurrent(None, |dir| async {
76                            if git::is_repo(&dir).await {
77                                let link = data::Link::Fs { dir };
78                                let view = git::view(&host, &link).await;
79                                locals.insert(link);
80                                for url in view.repo.iter().flat_map(|repo| {
81                                    repo.remotes.values().cloned()
82                                }) {
83                                    if unique.insert(url.clone()) {
84                                        urls_tx.send(url).unwrap_or_else(
85                                            |_| {
86                                                unreachable!(
87                                                    "urls_rx dropped while \
88                                                    urls_tx is still in use"
89                                                )
90                                            },
91                                        );
92                                    }
93                                }
94                                views_tx.send(view).unwrap_or_else(|_| {
95                                    unreachable!(
96                                        "view_rx dropped while view_tx \
97                                        is still in use"
98                                    )
99                                });
100                            }
101                        })
102                        .await;
103                }
104            }
105            .instrument(info_span!("locals_worker"))
106            .in_current_span(),
107        );
108
109        let remotes_worker = tokio::spawn(
110            {
111                let views_tx = views_tx.clone();
112                let remotes_ok = remotes_ok.clone();
113                let remotes_err = remotes_err.clone();
114                async move {
115                    UnboundedReceiverStream::new(urls_rx)
116                        .for_each_concurrent(None, {
117                            move |url: String| {
118                                let host = host.clone();
119                                let remotes_ok = remotes_ok.clone();
120                                let remotes_err = remotes_err.clone();
121                                let views_tx = views_tx.clone();
122                                async move {
123                                    let link = data::Link::Net { url };
124                                    let view = git::view(&host, &link).await;
125                                    if view.repo.is_some() {
126                                        remotes_ok.insert(link);
127                                    } else {
128                                        remotes_err.insert(link);
129                                    }
130                                    views_tx.send(view).unwrap_or_else(
131                                        |_| {
132                                            unreachable!(
133                                                "view_rx dropped while view_tx \
134                                                is still in use"
135                                            )
136                                        },
137                                    );
138                                }
139                            }
140                        })
141                        .await;
142                }
143            }
144            .instrument(info_span!("remotes_worker"))
145            .in_current_span(),
146        );
147
148        let batch_size = self.batch_size;
149        let storage_worker = tokio::spawn(
150            async move {
151                UnboundedReceiverStream::new(views_rx)
152                    .chunks(batch_size)
153                    .for_each_concurrent(None, move |views| {
154                        let storage = storage.clone();
155                        async move {
156                            match storage.store_views(&views[..]).await {
157                                Ok(()) => {
158                                    tracing::info!("Views store succeeded.");
159                                }
160                                Err(error) => {
161                                    // TODO Exit app on storage failure?
162                                    tracing::error!(
163                                        ?error,
164                                        "Views store failed."
165                                    );
166                                }
167                            }
168                        }
169                    })
170                    .await;
171            }
172            .instrument(info_span!("storage_worker"))
173            .in_current_span(),
174        );
175
176        let _ = locals_worker.await;
177        let _ = remotes_worker.await;
178        drop(views_tx); // XXX Otherwise view_rx blocks forever.
179        let _ = storage_worker.await;
180
181        tracing::info!(
182            locals = locals.len(),
183            remotes_ok = remotes_ok.len(),
184            remotes_err = remotes_err.len(),
185            "Final counts."
186        );
187        Ok(())
188    }
189}