use std::{
path::PathBuf,
str::FromStr,
sync::{Arc, Mutex},
time::Instant,
};
use anyhow::{Result, anyhow, bail};
use auth_git2::GitAuthenticator;
use git2::{Progress, Repository};
use crate::{
cli_interactor::count_lines_per_msg_vec,
git::{
Repo, RepoActions,
nostr_url::{CloneUrl, NostrUrlDecoded, ServerProtocol},
utils::check_ssh_keys,
},
utils::{Direction, get_read_protocols_to_try, join_with_and, set_protocol_preference},
};
pub fn fetch_from_git_server(
git_repo: &Repo,
oids: &[String],
git_server_url: &str,
decoded_nostr_url: &NostrUrlDecoded,
term: &console::Term,
is_grasp_server: bool,
) -> Result<()> {
let mut missing_oids: Vec<&String> = Vec::new();
for oid in oids {
if let Ok(oid_obj) = git2::Oid::from_str(oid) {
if git_repo.git_repo.find_tag(oid_obj).is_err() {
if let Ok(false) = git_repo.does_commit_exist(oid) {
missing_oids.push(oid)
}
}
}
}
if missing_oids.is_empty() {
return Ok(());
}
let server_url = git_server_url.parse::<CloneUrl>()?;
let protocols_to_attempt =
get_read_protocols_to_try(git_repo, &server_url, decoded_nostr_url, is_grasp_server);
let mut failed_protocols = vec![];
let mut success = false;
for protocol in &protocols_to_attempt {
term.write_line(
format!("fetching {} over {protocol}...", server_url.short_name(),).as_str(),
)?;
let formatted_url = server_url.format_as(protocol)?;
let res = fetch_from_git_server_url(
&git_repo.git_repo,
oids,
&formatted_url,
[ServerProtocol::UnauthHttps, ServerProtocol::UnauthHttp].contains(protocol),
decoded_nostr_url.ssh_key_file_path().as_ref(),
term,
);
if let Err(error) = res {
term.write_line(&format!(
"fetch: {formatted_url} failed over {protocol}{}: {error}",
if protocol == &ServerProtocol::Ssh {
if let Some(ssh_key_file) = &decoded_nostr_url.ssh_key_file_path() {
format!(" with ssh key from {ssh_key_file}")
} else {
String::new()
}
} else {
String::new()
}
))?;
failed_protocols.push(protocol);
} else {
success = true;
if !failed_protocols.is_empty() {
term.write_line(format!("fetch: succeeded over {protocol}").as_str())?;
let _ = set_protocol_preference(git_repo, protocol, &server_url, &Direction::Push);
}
break;
}
}
if success {
Ok(())
} else {
let error = anyhow!(
"{} failed over {}{}",
server_url.short_name(),
join_with_and(&failed_protocols),
if decoded_nostr_url.protocol.is_some() {
" and nostr url contains protocol override so no other protocols were attempted"
} else {
""
},
);
term.write_line(format!("fetch: {error}").as_str())?;
Err(error)
}
}
fn fetch_from_git_server_url(
git_repo: &Repository,
oids: &[String],
git_server_url: &str,
dont_authenticate: bool,
ssh_key_file: Option<&String>,
term: &console::Term,
) -> Result<()> {
if git_server_url.parse::<CloneUrl>()?.protocol() == ServerProtocol::Ssh && !check_ssh_keys() {
bail!("no ssh keys found");
}
let git_config = git_repo.config()?;
let mut git_server_remote = git_repo.remote_anonymous(git_server_url)?;
let auth = {
if dont_authenticate {
GitAuthenticator::default()
} else if git_server_url.contains("git@") {
if let Some(ssh_key_file) = ssh_key_file {
GitAuthenticator::default()
.add_ssh_key_from_file(PathBuf::from_str(ssh_key_file)?, None)
} else {
GitAuthenticator::default()
}
} else {
GitAuthenticator::default()
}
};
let mut fetch_options = git2::FetchOptions::new();
let mut remote_callbacks = git2::RemoteCallbacks::new();
let fetch_reporter = Arc::new(Mutex::new(FetchReporter::new(term)));
remote_callbacks.sideband_progress({
let fetch_reporter = Arc::clone(&fetch_reporter);
move |data| {
let mut reporter = fetch_reporter.lock().unwrap();
reporter.process_remote_msg(data);
true
}
});
remote_callbacks.transfer_progress({
let fetch_reporter = Arc::clone(&fetch_reporter);
move |stats| {
let mut reporter = fetch_reporter.lock().unwrap();
reporter.process_transfer_progress_update(&stats);
true
}
});
if !dont_authenticate {
remote_callbacks.credentials(auth.credentials(&git_config));
}
fetch_options.remote_callbacks(remote_callbacks);
git_server_remote.download(oids, Some(&mut fetch_options))?;
git_server_remote.disconnect()?;
Ok(())
}
struct FetchReporter<'a> {
remote_msgs: Vec<String>,
transfer_progress_msgs: Vec<String>,
term: &'a console::Term,
start_time: Option<Instant>,
end_time: Option<Instant>,
}
impl<'a> FetchReporter<'a> {
fn new(term: &'a console::Term) -> Self {
Self {
remote_msgs: vec![],
transfer_progress_msgs: vec![],
term,
start_time: None,
end_time: None,
}
}
fn write_all(&self, lines_to_clear: usize) {
let _ = self.term.clear_last_lines(lines_to_clear);
for msg in &self.remote_msgs {
let _ = self.term.write_line(format!("remote: {msg}").as_str());
}
for msg in &self.transfer_progress_msgs {
let _ = self.term.write_line(msg);
}
}
fn count_all_existing_lines(&self) -> usize {
let width = self.term.size().1;
count_lines_per_msg_vec(width, &self.remote_msgs, "remote: ".len())
+ count_lines_per_msg_vec(width, &self.transfer_progress_msgs, 0)
}
fn just_write_transfer_progress(&self, lines_to_clear: usize) {
let _ = self.term.clear_last_lines(lines_to_clear);
for msg in &self.transfer_progress_msgs {
let _ = self.term.write_line(msg);
}
}
fn just_count_transfer_progress(&self) -> usize {
let width = self.term.size().1;
count_lines_per_msg_vec(width, &self.transfer_progress_msgs, 0)
}
fn process_remote_msg(&mut self, data: &[u8]) {
if let Ok(data) = str::from_utf8(data) {
let data = data
.split(['\n', '\r'])
.map(str::trim)
.filter(|line| !line.trim().is_empty())
.collect::<Vec<&str>>();
for data in data {
let existing_lines = self.count_all_existing_lines();
let msg = data.to_string();
if let Some(last) = self.remote_msgs.last() {
if (last.starts_with("Enume") && !last.ends_with(", done."))
|| ((last.starts_with("Compre") || last.starts_with("Count"))
&& !last.contains(')'))
{
let last = self.remote_msgs.pop().unwrap();
self.remote_msgs.push(format!("{last}{msg}"));
} else if (last.contains('%') && !last.contains("100%"))
|| last == &msg.replace(", done.", "")
{
self.remote_msgs.pop();
self.remote_msgs.push(msg);
} else {
self.remote_msgs.push(msg);
}
} else {
self.remote_msgs.push(msg);
}
self.write_all(existing_lines);
}
}
}
fn process_transfer_progress_update(&mut self, progress_stats: &git2::Progress<'_>) {
if self.start_time.is_none() {
self.start_time = Some(Instant::now());
}
let existing_lines = self.just_count_transfer_progress();
let updated = report_on_transfer_progress(
progress_stats,
&self.start_time.unwrap(),
self.end_time.as_ref(),
);
if self.transfer_progress_msgs.len() <= updated.len() {
if self.end_time.is_none() && updated.first().is_some_and(|f| f.contains("100%")) {
self.end_time = Some(Instant::now());
}
self.transfer_progress_msgs = updated;
}
self.just_write_transfer_progress(existing_lines);
}
}
#[allow(clippy::cast_precision_loss)]
#[allow(clippy::float_cmp)]
#[allow(clippy::needless_pass_by_value)]
fn report_on_transfer_progress(
progress_stats: &Progress<'_>,
start_time: &Instant,
end_time: Option<&Instant>,
) -> Vec<String> {
let mut report = vec![];
let total = progress_stats.total_objects() as f64;
if total == 0.0 {
return report;
}
let received = progress_stats.received_objects() as f64;
let percentage = ((received / total) * 100.0)
.floor();
let received_bytes = progress_stats.received_bytes() as f64;
let (size, unit) = if received_bytes >= (1024.0 * 1024.0) {
(received_bytes / (1024.0 * 1024.0), "MiB")
} else {
(received_bytes / 1024.0, "KiB")
};
let speed = {
let duration = if let Some(end_time) = end_time {
(*end_time - *start_time).as_millis() as f64
} else {
start_time.elapsed().as_millis() as f64
};
if duration > 0.0 {
(received_bytes / (1024.0 * 1024.0)) / (duration / 1000.0) } else {
0.0
}
};
report.push(format!(
"Receiving objects: {percentage}% ({received}/{total}) {size:.2} {unit} | {speed:.2} MiB/s{}",
if received == total {
", done."
} else { ""},
));
if received == total {
let indexed_deltas = progress_stats.indexed_deltas() as f64;
let total_deltas = progress_stats.total_deltas() as f64;
let percentage = ((indexed_deltas / total_deltas) * 100.0)
.floor();
if total_deltas > 0.0 {
report.push(format!(
"Resolving deltas: {percentage}% ({indexed_deltas}/{total_deltas}){}",
if indexed_deltas == total_deltas {
", done."
} else {
""
},
));
}
}
report
}
#[cfg(test)]
mod tests {
use super::*;
fn pass_through_fetch_reporter_proces_remote_msg(msgs: Vec<&str>) -> Vec<String> {
let term = console::Term::stdout();
let mut reporter = FetchReporter::new(&term);
for msg in msgs {
reporter.process_remote_msg(msg.as_bytes());
}
reporter.remote_msgs
}
#[test]
fn logs_single_msg() {
assert_eq!(
pass_through_fetch_reporter_proces_remote_msg(vec![
"Enumerating objects: 23716, done.",
]),
vec!["Enumerating objects: 23716, done."]
);
}
#[test]
fn logs_multiple_msgs() {
assert_eq!(
pass_through_fetch_reporter_proces_remote_msg(vec![
"Enumerating objects: 23716, done.",
"Counting objects: 0% (1/2195)",
]),
vec![
"Enumerating objects: 23716, done.",
"Counting objects: 0% (1/2195)",
]
);
}
mod ignores {
use super::*;
#[test]
fn empty_msgs() {
assert_eq!(
pass_through_fetch_reporter_proces_remote_msg(vec![
"Enumerating objects: 23716, done.",
"",
"Counting objects: 0% (1/2195)",
"",
]),
vec![
"Enumerating objects: 23716, done.",
"Counting objects: 0% (1/2195)",
]
);
}
#[test]
fn whitespace_msgs() {
assert_eq!(
pass_through_fetch_reporter_proces_remote_msg(vec![
"Enumerating objects: 23716, done.",
" ",
"Counting objects: 0% (1/2195)",
" \r\n \r",
]),
vec![
"Enumerating objects: 23716, done.",
"Counting objects: 0% (1/2195)",
]
);
}
}
mod splits {
use super::*;
#[test]
fn multiple_lines_in_single_msg() {
assert_eq!(
pass_through_fetch_reporter_proces_remote_msg(vec![
"Enumerating objects: 23716, done.\r\nCounting objects: 0% (1/2195)",
"",
]),
vec![
"Enumerating objects: 23716, done.",
"Counting objects: 0% (1/2195)",
]
);
}
}
mod joins_lines_sent_over_multiple_msgs {
use super::*;
#[test]
fn enumerating() {
assert_eq!(
pass_through_fetch_reporter_proces_remote_msg(vec![
"Enumerat",
"ing objec",
"ts: 23716, done.",
"Counting objects: 0% (1/2195)",
]),
vec![
"Enumerating objects: 23716, done.",
"Counting objects: 0% (1/2195)",
]
);
}
#[test]
fn counting() {
assert_eq!(
pass_through_fetch_reporter_proces_remote_msg(vec![
"Enumerating objects: 23716, done.",
"Counting obj",
"ects: 0% (1/2195)",
"Count",
"ing objects: 1% (22/",
"2195)",
]),
vec![
"Enumerating objects: 23716, done.",
"Counting objects: 1% (22/2195)",
]
);
}
#[test]
fn compressing() {
assert_eq!(
pass_through_fetch_reporter_proces_remote_msg(vec![
"Compress",
"ing obj",
"ect",
"s: 0% (1/56",
"0)"
]),
vec!["Compressing objects: 0% (1/560)"]
);
}
}
#[test]
fn msgs_with_pc_and_not_100pc_are_replaced() {
assert_eq!(
pass_through_fetch_reporter_proces_remote_msg(vec![
"Enumerating objects: 23716, done.",
"Counting objects: 0% (1/2195)",
"Counting objects: 1% (22/2195)",
]),
vec![
"Enumerating objects: 23716, done.",
"Counting objects: 1% (22/2195)",
]
);
}
mod msgs_with_pc_100pc_are_not_replaced {
use super::*;
#[test]
fn when_next_msg_is_not_identical_but_with_done() {
assert_eq!(
pass_through_fetch_reporter_proces_remote_msg(vec![
"Enumerating objects: 23716, done.",
"Counting objects: 0% (1/2195)",
"Counting objects: 1% (22/2195)",
"Counting objects: 100% (2195/2195)",
"Compressing objects: 0% (1/560)"
]),
vec![
"Enumerating objects: 23716, done.",
"Counting objects: 100% (2195/2195)",
"Compressing objects: 0% (1/560)"
]
);
}
#[test]
fn but_is_when_next_msg_is_identical_but_with_done_appended() {
assert_eq!(
pass_through_fetch_reporter_proces_remote_msg(vec![
"Enumerating objects: 23716, done.",
"Counting objects: 0% (1/2195)",
"Counting objects: 1% (22/2195)",
"Counting objects: 100% (2195/2195)",
"Counting objects: 100% (2195/2195), done.",
]),
vec![
"Enumerating objects: 23716, done.",
"Counting objects: 100% (2195/2195), done.",
]
);
}
}
}