Skip to main content

binocular/search/sources/git/
stream.rs

1use super::preview::read_history_blob;
2use super::scope::{GitSearchMode, GitSearchScope};
3use crate::infra::channel::{BatchSender, Sender};
4use crate::search::types::SearchItem;
5use anyhow::Context;
6use std::io::{BufRead, BufReader};
7use std::path::PathBuf;
8use std::process::{Command, Stdio};
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11
12const GIT_BATCH_SIZE: usize = 128;
13pub const HISTORY_PATH_SEPARATOR: char = '\u{1f}';
14const HISTORY_PATH_REPLACEMENT: char = '\u{fffd}';
15pub const CURRENT_COMMIT_REF: &str = "HEAD";
16const FIELD_SEPARATOR: char = '\u{1e}';
17
18pub fn spawn_git_searcher(
19    scope: GitSearchScope,
20    stop: Arc<AtomicBool>,
21    tx: impl Sender<Vec<SearchItem>>,
22) -> std::thread::JoinHandle<()> {
23    std::thread::spawn(move || {
24        if let Err(err) = stream_git_items(scope, stop, tx.clone()) {
25            let _ = tx.send(vec![SearchItem::message(err.to_string())]);
26        }
27    })
28}
29
30pub fn sanitize_path_field(path: &str) -> String {
31    path.replace(
32        HISTORY_PATH_SEPARATOR,
33        &HISTORY_PATH_REPLACEMENT.to_string(),
34    )
35}
36
37pub fn is_current_commit(commit: &str) -> bool {
38    commit == CURRENT_COMMIT_REF
39}
40
41fn stream_git_items(
42    scope: GitSearchScope,
43    stop: Arc<AtomicBool>,
44    tx: impl Sender<Vec<SearchItem>>,
45) -> anyhow::Result<()> {
46    match scope.mode {
47        GitSearchMode::History { file } => {
48            let display_path = scope
49                .display_path
50                .unwrap_or_else(|| file.to_string_lossy().into_owned());
51            stream_history(scope.repo_root, file, display_path, stop, tx)
52        }
53        GitSearchMode::Branches => stream_branches(scope.repo_root, stop, tx),
54        GitSearchMode::Commits => stream_commits(scope.repo_root, stop, tx),
55    }
56}
57
58fn stream_history(
59    repo_root: PathBuf,
60    file: PathBuf,
61    display_path: String,
62    stop: Arc<AtomicBool>,
63    tx: impl Sender<Vec<SearchItem>>,
64) -> anyhow::Result<()> {
65    let mut child = Command::new("git")
66        .arg("log")
67        .arg("--format=%H")
68        .arg("--follow")
69        .arg("--")
70        .arg(&file)
71        .current_dir(&repo_root)
72        .stdout(Stdio::piped())
73        .spawn()
74        .context("failed to run git log for history search")?;
75
76    let stdout = child.stdout.take().context("git log stdout unavailable")?;
77    let mut batcher = BatchSender::new(tx, GIT_BATCH_SIZE);
78
79    for line in BufReader::new(stdout).lines() {
80        if stop.load(Ordering::Relaxed) {
81            let _ = child.kill();
82            let _ = child.wait();
83            return Ok(());
84        }
85
86        let hash = line.context("failed to read git log output")?;
87        let hash = hash.trim();
88        if hash.is_empty() {
89            continue;
90        }
91
92        let blob = match read_history_blob(&repo_root, hash, &file) {
93            Ok(blob) => blob,
94            Err(_) => continue,
95        };
96        if blob.contains(&0) {
97            continue;
98        }
99
100        push_history_lines(&mut batcher, hash, &display_path, &blob, &stop);
101    }
102
103    wait_for_success(child, stop, "git log")
104}
105
106fn stream_branches(
107    repo_root: PathBuf,
108    stop: Arc<AtomicBool>,
109    tx: impl Sender<Vec<SearchItem>>,
110) -> anyhow::Result<()> {
111    let format = format!(
112        "%(HEAD){sep}%(refname:short){sep}%(objectname:short){sep}%(contents:subject){sep}%(committerdate:relative)",
113        sep = FIELD_SEPARATOR
114    );
115    let mut child = Command::new("git")
116        .arg("for-each-ref")
117        .arg("--sort=-committerdate")
118        .arg(format!("--format={format}"))
119        .arg("refs/heads")
120        .current_dir(&repo_root)
121        .stdout(Stdio::piped())
122        .spawn()
123        .context("failed to run git for-each-ref for branches")?;
124
125    let stdout = child
126        .stdout
127        .take()
128        .context("git branch stdout unavailable")?;
129    let mut batcher = BatchSender::new(tx, GIT_BATCH_SIZE);
130    for line in BufReader::new(stdout).lines() {
131        if stop.load(Ordering::Relaxed) {
132            let _ = child.kill();
133            let _ = child.wait();
134            return Ok(());
135        }
136
137        let line = line.context("failed to read git branch output")?;
138        let mut parts = line.splitn(5, FIELD_SEPARATOR);
139        let head_marker = parts.next().unwrap_or_default().trim();
140        let branch = parts.next().unwrap_or_default().trim();
141        let commit = parts.next().unwrap_or_default().trim();
142        let subject = parts.next().unwrap_or_default().trim();
143        let relative_time = parts.next().unwrap_or_default().trim();
144        if branch.is_empty() {
145            continue;
146        }
147        batcher.push(SearchItem::git_branch(
148            branch,
149            commit,
150            subject,
151            head_marker == "*",
152            relative_time,
153        ));
154    }
155
156    wait_for_success(child, stop, "git for-each-ref")
157}
158
159fn stream_commits(
160    repo_root: PathBuf,
161    stop: Arc<AtomicBool>,
162    tx: impl Sender<Vec<SearchItem>>,
163) -> anyhow::Result<()> {
164    let format = format!(
165        "%H{sep}%h{sep}%s{sep}%an{sep}%ar{sep}%D",
166        sep = FIELD_SEPARATOR
167    );
168    let mut child = Command::new("git")
169        .arg("log")
170        .arg("--date=short")
171        .arg(format!("--format={format}"))
172        .current_dir(&repo_root)
173        .stdout(Stdio::piped())
174        .spawn()
175        .context("failed to run git log for commits")?;
176
177    let stdout = child
178        .stdout
179        .take()
180        .context("git commit log stdout unavailable")?;
181    let mut batcher = BatchSender::new(tx, GIT_BATCH_SIZE);
182    for line in BufReader::new(stdout).lines() {
183        if stop.load(Ordering::Relaxed) {
184            let _ = child.kill();
185            let _ = child.wait();
186            return Ok(());
187        }
188
189        let line = line.context("failed to read git commit output")?;
190        let mut parts = line.splitn(6, FIELD_SEPARATOR);
191        let commit = parts.next().unwrap_or_default().trim();
192        let short_commit = parts.next().unwrap_or_default().trim();
193        let subject = parts.next().unwrap_or_default().trim();
194        let author = parts.next().unwrap_or_default().trim();
195        let date = parts.next().unwrap_or_default().trim();
196        let refs = parts.next().unwrap_or_default().trim();
197        if commit.is_empty() {
198            continue;
199        }
200        batcher.push(SearchItem::git_commit(
201            commit,
202            short_commit,
203            subject,
204            author,
205            date,
206            refs,
207        ));
208    }
209
210    wait_for_success(child, stop, "git log")
211}
212
213fn push_history_lines<S: Sender<Vec<SearchItem>>>(
214    batcher: &mut BatchSender<SearchItem, S>,
215    commit: &str,
216    display_path: &str,
217    blob: &[u8],
218    stop: &AtomicBool,
219) {
220    let sanitized_path = sanitize_path_field(display_path);
221    let mut line_number = 1usize;
222    for line in BufReader::new(blob).split(b'\n') {
223        if stop.load(Ordering::Relaxed) {
224            break;
225        }
226
227        let Ok(mut line) = line else { break };
228        if line.last() == Some(&b'\r') {
229            line.pop();
230        }
231        let text = String::from_utf8_lossy(&line).into_owned();
232        batcher.push(SearchItem::history_line(
233            commit,
234            &sanitized_path,
235            line_number,
236            text,
237        ));
238        line_number += 1;
239    }
240}
241
242fn wait_for_success(
243    mut child: std::process::Child,
244    stop: Arc<AtomicBool>,
245    command_name: &str,
246) -> anyhow::Result<()> {
247    let status = child
248        .wait()
249        .with_context(|| format!("failed to wait for {command_name}"))?;
250    if !status.success() && !stop.load(Ordering::Relaxed) {
251        anyhow::bail!("{command_name} exited with status {status}");
252    }
253    Ok(())
254}
255
256#[cfg(test)]
257mod tests {
258    use super::*;
259
260    #[test]
261    fn sanitize_path_field_replaces_internal_separator() {
262        let sanitized = sanitize_path_field("dir\u{1f}file.txt");
263        assert!(!sanitized.contains(HISTORY_PATH_SEPARATOR));
264    }
265}