binocular/search/sources/git/
stream.rs1use super::preview::read_history_blob;
2use super::scope::{GitSearchMode, GitSearchScope};
3use crate::infra::channel::{BatchSender, Sender};
4use crate::search::types::SearchItem;
5use anyhow::Context;
6use std::io::{BufRead, BufReader};
7use std::path::PathBuf;
8use std::process::{Command, Stdio};
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11
12const GIT_BATCH_SIZE: usize = 128;
13pub const HISTORY_PATH_SEPARATOR: char = '\u{1f}';
14const HISTORY_PATH_REPLACEMENT: char = '\u{fffd}';
15pub const CURRENT_COMMIT_REF: &str = "HEAD";
16const FIELD_SEPARATOR: char = '\u{1e}';
17
18pub fn spawn_git_searcher(
19 scope: GitSearchScope,
20 stop: Arc<AtomicBool>,
21 tx: impl Sender<Vec<SearchItem>>,
22) -> std::thread::JoinHandle<()> {
23 std::thread::spawn(move || {
24 if let Err(err) = stream_git_items(scope, stop, tx.clone()) {
25 let _ = tx.send(vec![SearchItem::message(err.to_string())]);
26 }
27 })
28}
29
30pub fn sanitize_path_field(path: &str) -> String {
31 path.replace(
32 HISTORY_PATH_SEPARATOR,
33 &HISTORY_PATH_REPLACEMENT.to_string(),
34 )
35}
36
37pub fn is_current_commit(commit: &str) -> bool {
38 commit == CURRENT_COMMIT_REF
39}
40
41fn stream_git_items(
42 scope: GitSearchScope,
43 stop: Arc<AtomicBool>,
44 tx: impl Sender<Vec<SearchItem>>,
45) -> anyhow::Result<()> {
46 match scope.mode {
47 GitSearchMode::History { file } => {
48 let display_path = scope
49 .display_path
50 .unwrap_or_else(|| file.to_string_lossy().into_owned());
51 stream_history(scope.repo_root, file, display_path, stop, tx)
52 }
53 GitSearchMode::Branches => stream_branches(scope.repo_root, stop, tx),
54 GitSearchMode::Commits => stream_commits(scope.repo_root, stop, tx),
55 }
56}
57
58fn stream_history(
59 repo_root: PathBuf,
60 file: PathBuf,
61 display_path: String,
62 stop: Arc<AtomicBool>,
63 tx: impl Sender<Vec<SearchItem>>,
64) -> anyhow::Result<()> {
65 let mut child = Command::new("git")
66 .arg("log")
67 .arg("--format=%H")
68 .arg("--follow")
69 .arg("--")
70 .arg(&file)
71 .current_dir(&repo_root)
72 .stdout(Stdio::piped())
73 .spawn()
74 .context("failed to run git log for history search")?;
75
76 let stdout = child.stdout.take().context("git log stdout unavailable")?;
77 let mut batcher = BatchSender::new(tx, GIT_BATCH_SIZE);
78
79 for line in BufReader::new(stdout).lines() {
80 if stop.load(Ordering::Relaxed) {
81 let _ = child.kill();
82 let _ = child.wait();
83 return Ok(());
84 }
85
86 let hash = line.context("failed to read git log output")?;
87 let hash = hash.trim();
88 if hash.is_empty() {
89 continue;
90 }
91
92 let blob = match read_history_blob(&repo_root, hash, &file) {
93 Ok(blob) => blob,
94 Err(_) => continue,
95 };
96 if blob.contains(&0) {
97 continue;
98 }
99
100 push_history_lines(&mut batcher, hash, &display_path, &blob, &stop);
101 }
102
103 wait_for_success(child, stop, "git log")
104}
105
106fn stream_branches(
107 repo_root: PathBuf,
108 stop: Arc<AtomicBool>,
109 tx: impl Sender<Vec<SearchItem>>,
110) -> anyhow::Result<()> {
111 let format = format!(
112 "%(HEAD){sep}%(refname:short){sep}%(objectname:short){sep}%(contents:subject){sep}%(committerdate:relative)",
113 sep = FIELD_SEPARATOR
114 );
115 let mut child = Command::new("git")
116 .arg("for-each-ref")
117 .arg("--sort=-committerdate")
118 .arg(format!("--format={format}"))
119 .arg("refs/heads")
120 .current_dir(&repo_root)
121 .stdout(Stdio::piped())
122 .spawn()
123 .context("failed to run git for-each-ref for branches")?;
124
125 let stdout = child
126 .stdout
127 .take()
128 .context("git branch stdout unavailable")?;
129 let mut batcher = BatchSender::new(tx, GIT_BATCH_SIZE);
130 for line in BufReader::new(stdout).lines() {
131 if stop.load(Ordering::Relaxed) {
132 let _ = child.kill();
133 let _ = child.wait();
134 return Ok(());
135 }
136
137 let line = line.context("failed to read git branch output")?;
138 let mut parts = line.splitn(5, FIELD_SEPARATOR);
139 let head_marker = parts.next().unwrap_or_default().trim();
140 let branch = parts.next().unwrap_or_default().trim();
141 let commit = parts.next().unwrap_or_default().trim();
142 let subject = parts.next().unwrap_or_default().trim();
143 let relative_time = parts.next().unwrap_or_default().trim();
144 if branch.is_empty() {
145 continue;
146 }
147 batcher.push(SearchItem::git_branch(
148 branch,
149 commit,
150 subject,
151 head_marker == "*",
152 relative_time,
153 ));
154 }
155
156 wait_for_success(child, stop, "git for-each-ref")
157}
158
159fn stream_commits(
160 repo_root: PathBuf,
161 stop: Arc<AtomicBool>,
162 tx: impl Sender<Vec<SearchItem>>,
163) -> anyhow::Result<()> {
164 let format = format!(
165 "%H{sep}%h{sep}%s{sep}%an{sep}%ar{sep}%D",
166 sep = FIELD_SEPARATOR
167 );
168 let mut child = Command::new("git")
169 .arg("log")
170 .arg("--date=short")
171 .arg(format!("--format={format}"))
172 .current_dir(&repo_root)
173 .stdout(Stdio::piped())
174 .spawn()
175 .context("failed to run git log for commits")?;
176
177 let stdout = child
178 .stdout
179 .take()
180 .context("git commit log stdout unavailable")?;
181 let mut batcher = BatchSender::new(tx, GIT_BATCH_SIZE);
182 for line in BufReader::new(stdout).lines() {
183 if stop.load(Ordering::Relaxed) {
184 let _ = child.kill();
185 let _ = child.wait();
186 return Ok(());
187 }
188
189 let line = line.context("failed to read git commit output")?;
190 let mut parts = line.splitn(6, FIELD_SEPARATOR);
191 let commit = parts.next().unwrap_or_default().trim();
192 let short_commit = parts.next().unwrap_or_default().trim();
193 let subject = parts.next().unwrap_or_default().trim();
194 let author = parts.next().unwrap_or_default().trim();
195 let date = parts.next().unwrap_or_default().trim();
196 let refs = parts.next().unwrap_or_default().trim();
197 if commit.is_empty() {
198 continue;
199 }
200 batcher.push(SearchItem::git_commit(
201 commit,
202 short_commit,
203 subject,
204 author,
205 date,
206 refs,
207 ));
208 }
209
210 wait_for_success(child, stop, "git log")
211}
212
213fn push_history_lines<S: Sender<Vec<SearchItem>>>(
214 batcher: &mut BatchSender<SearchItem, S>,
215 commit: &str,
216 display_path: &str,
217 blob: &[u8],
218 stop: &AtomicBool,
219) {
220 let sanitized_path = sanitize_path_field(display_path);
221 let mut line_number = 1usize;
222 for line in BufReader::new(blob).split(b'\n') {
223 if stop.load(Ordering::Relaxed) {
224 break;
225 }
226
227 let Ok(mut line) = line else { break };
228 if line.last() == Some(&b'\r') {
229 line.pop();
230 }
231 let text = String::from_utf8_lossy(&line).into_owned();
232 batcher.push(SearchItem::history_line(
233 commit,
234 &sanitized_path,
235 line_number,
236 text,
237 ));
238 line_number += 1;
239 }
240}
241
242fn wait_for_success(
243 mut child: std::process::Child,
244 stop: Arc<AtomicBool>,
245 command_name: &str,
246) -> anyhow::Result<()> {
247 let status = child
248 .wait()
249 .with_context(|| format!("failed to wait for {command_name}"))?;
250 if !status.success() && !stop.load(Ordering::Relaxed) {
251 anyhow::bail!("{command_name} exited with status {status}");
252 }
253 Ok(())
254}
255
256#[cfg(test)]
257mod tests {
258 use super::*;
259
260 #[test]
261 fn sanitize_path_field_replaces_internal_separator() {
262 let sanitized = sanitize_path_field("dir\u{1f}file.txt");
263 assert!(!sanitized.contains(HISTORY_PATH_SEPARATOR));
264 }
265}