use std::sync::Arc;
use futures::StreamExt;
use git_lfs_api::{Action, Actions};
use git_lfs_pointer::Oid;
use git_lfs_store::Store;
use reqwest::header::CONTENT_LENGTH;
use reqwest::{Body, Method, Response};
use serde::Serialize;
use tokio::sync::mpsc::UnboundedSender;
use tokio_util::io::{ReaderStream, StreamReader, SyncIoBridge};
use crate::error::TransferError;
use crate::event::Event;
const OCTET_STREAM: &str = "application/octet-stream";
#[derive(Debug, Serialize)]
struct VerifyBody<'a> {
oid: &'a str,
size: u64,
}
pub(crate) async fn download(
http: &reqwest::Client,
store: Arc<Store>,
oid: &str,
action: &Action,
events: Option<&UnboundedSender<Event>>,
) -> Result<u64, TransferError> {
let expected = Oid::from_hex(oid)?;
let mut req = http.request(Method::GET, &action.href);
for (k, v) in &action.header {
req = req.header(k, v);
}
let resp = req.send().await?;
check_status(&resp, &action.href)?;
let mut bytes_done: u64 = 0;
let oid_owned = oid.to_owned();
let events_clone = events.cloned();
let body_stream = resp.bytes_stream().map(move |chunk| {
if let Ok(ref c) = chunk {
bytes_done += c.len() as u64;
if let Some(s) = &events_clone {
let _ = s.send(Event::Progress {
oid: oid_owned.clone(),
bytes_done,
});
}
}
chunk.map_err(std::io::Error::other)
});
let async_reader = StreamReader::new(body_stream);
let mut bridge = SyncIoBridge::new(async_reader);
let size = tokio::task::spawn_blocking(move || store.insert_verified(expected, &mut bridge))
.await
.map_err(|join_err| std::io::Error::other(join_err.to_string()))??;
Ok(size)
}
pub(crate) async fn upload(
http: &reqwest::Client,
store: Arc<Store>,
oid: &str,
size: u64,
actions: &Actions,
events: Option<&UnboundedSender<Event>>,
) -> Result<(), TransferError> {
let upload_action = actions
.upload
.as_ref()
.ok_or_else(|| TransferError::Io(std::io::Error::other("missing upload action")))?;
let oid_parsed = Oid::from_hex(oid)?;
let path = store.object_path(oid_parsed);
let file = tokio::fs::File::open(&path).await?;
let mut bytes_done: u64 = 0;
let oid_owned = oid.to_owned();
let events_clone = events.cloned();
let stream = ReaderStream::new(file).map(move |chunk| {
if let Ok(ref c) = chunk {
bytes_done += c.len() as u64;
if let Some(s) = &events_clone {
let _ = s.send(Event::Progress {
oid: oid_owned.clone(),
bytes_done,
});
}
}
chunk
});
let body = Body::wrap_stream(stream);
let mut req = http
.request(Method::PUT, &upload_action.href)
.header(CONTENT_LENGTH, size);
let mut saw_content_type = false;
for (k, v) in &upload_action.header {
if k.eq_ignore_ascii_case("content-type") {
saw_content_type = true;
}
req = req.header(k, v);
}
if !saw_content_type {
req = req.header(reqwest::header::CONTENT_TYPE, OCTET_STREAM);
}
let resp = req.body(body).send().await?;
check_status(&resp, &upload_action.href)?;
if let Some(verify_action) = &actions.verify {
verify(http, oid, size, verify_action).await?;
}
Ok(())
}
async fn verify(
http: &reqwest::Client,
oid: &str,
size: u64,
action: &Action,
) -> Result<(), TransferError> {
let mut req = http
.request(Method::POST, &action.href)
.header(reqwest::header::ACCEPT, "application/vnd.git-lfs+json")
.header(
reqwest::header::CONTENT_TYPE,
"application/vnd.git-lfs+json",
);
for (k, v) in &action.header {
req = req.header(k, v);
}
let resp = req.json(&VerifyBody { oid, size }).send().await?;
check_status(&resp, &action.href)?;
Ok(())
}
fn check_status(resp: &Response, url: &str) -> Result<(), TransferError> {
if resp.status().is_success() {
Ok(())
} else {
Err(TransferError::ActionStatus {
status: resp.status().as_u16(),
url: strip_query(url).to_owned(),
})
}
}
fn strip_query(url: &str) -> &str {
url.split_once('?').map_or(url, |(base, _)| base)
}