use base64::decode;
use eyre::{eyre, Result};
use futures::{
stream::{StreamExt, TryStreamExt},
Stream
};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use serde_json::to_value;
use std::{fs::{self}, io::Cursor, path::PathBuf};
use std::{
fs::File,
io::{self, ErrorKind, Read, Write},
};
use tokio::sync::mpsc;
use tokio_util::bytes::Bytes;
const DEBUG: bool = false;
pub async fn handle_download(node: NftNode, dir: &PathBuf, client: &Client) -> Result<()> {
let image = &node.token.image;
let name = match &node.token.name {
Some(name) => name,
None => return Err(eyre!("Image data not found for {:#?}", node)),
};
let (url, mime) = match image {
NftImage::Object {
url,
mime_type,
size: _,
} => (url, mime_type),
NftImage::Url(url) => (url, &None), _ => return Err(eyre!("No image URL found for {name}")),
};
let extension = if url.starts_with("data:image/svg") {
"svg".to_string()
} else if let Some(mime) = mime {
mime.rsplit("/").next().unwrap_or_default().to_string()
} else {
url.rsplit('.').next().unwrap_or_default().to_lowercase()
};
let file_path = dir.join(format!("{name}.{extension}"));
if file_path.is_file() {
if DEBUG {
println!("Skipping {name}");
}
return Ok(());
}
if DEBUG {
println!("Downloading {name} to {:?}", file_path);
}
let (progress_tx, mut _progress_rx) = mpsc::channel(10); match url {
url if url.starts_with("data:image/svg") => save_base64_image(
&url.strip_prefix("data:image/svg+xml;base64,")
.unwrap_or(&url),
file_path,
)?,
url if url.starts_with("ipfs") => {
let parts: Vec<&str> = url.split('/').collect();
if let Some(hash) = parts.iter().find(|&&part| part.starts_with("Qm")) {
let ipfs_url = format!("https://ipfs.io/ipfs/{hash}");
if let Err(error) = download_image(&client, &ipfs_url, file_path, progress_tx).await {
return Err(eyre::eyre!("Error downloading image {}: {}", name, error));
}
}
}
url => {
if let Err(error) = download_image(&client, &url, file_path, progress_tx).await {
return Err(eyre::eyre!("Error downloading image {}: {}", name, error));
};
}
}
if DEBUG {
println!("{name} saved successfully");
}
Ok(())
}
pub struct DownloadProgress {
pub name: String,
pub progress: u64,
pub total: u64,
}
#[derive(Debug)]
struct DownloadResult {
file_path: std::path::PathBuf,
progress: u64,
total: u64,
}
struct ProgressTracker {
progress: u64,
}
impl ProgressTracker {
fn new() -> Self {
ProgressTracker { progress: 0 }
}
async fn track_progress<R: Stream<Item = Result<Bytes>> + Unpin>(
&mut self,
index: usize,
mut reader_stream: R,
mut file: File,
progress_tx: &mpsc::Sender<(usize, u64)>,
) -> Result<()> {
let mut buffer = [0; 8192];
while let Some(chunk_result) = reader_stream.next().await {
let chunk = match chunk_result {
Ok(chunk) => chunk,
Err(e) => return Err(e.into()),
};
let mut cursor = Cursor::new(chunk);
let bytes_read = cursor.read(&mut buffer)?;
file.write_all(&buffer[..bytes_read])?;
self.progress += bytes_read as u64;
match progress_tx.try_send((index, self.progress)) {
Ok(_) => {
}
Err(mpsc::error::TrySendError::Full(_)) => {
}
Err(mpsc::error::TrySendError::Closed(_)) => {
break;
}
}
}
Ok(())
}
}
async fn download_image(
client: &Client,
image_url: &str,
file_path: PathBuf,
progress_tx: mpsc::Sender<(u64, u64)>,
) -> Result<()> {
let response = client.get(image_url).send().await?;
let content_length = response.content_length().unwrap_or(0);
let mut byte_stream = response.bytes_stream();
let mut progress: u64 = 0;
let mut file = File::create(file_path)?;
while let Some(chunk) = byte_stream.next().await {
let chunk = chunk?;
let chunk_len = chunk.len();
progress += chunk_len as u64;
file.write_all(&chunk)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
let _ = progress_tx.send((progress, content_length)).await;
}
if content_length != progress {
return Err(eyre::eyre!(
"Downloaded file size does not match the expected size"
));
}
Ok(())
}
pub async fn create_directory(dir_path: &PathBuf) -> Result<PathBuf>
{
let res = match fs::metadata(dir_path) {
Ok(metadata) => {
if !metadata.is_dir() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
format!("{:?} is not a directory", dir_path),
)
.into());
}
dir_path.to_path_buf()
}
Err(e) if e.kind() == ErrorKind::NotFound => {
fs::create_dir_all(dir_path)?;
if DEBUG { println!("created directory: {:?}", dir_path);}
dir_path.to_path_buf()
}
Err(e) => {
return Err(e.into());
}
};
Ok(res)
}
fn save_base64_image(base64_data: &str, file_path: PathBuf) -> Result<()> {
let decoded_data = decode(base64_data)?;
let mut file = File::create(file_path)?;
file.write_all(&decoded_data)?;
Ok(())
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(untagged)]
#[serde(rename_all = "camelCase")]
pub enum NftImage {
Null,
Url(String),
Object {
url: String,
size: Option<serde_json::Value>,
mime_type: Option<String>,
},
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct NftToken {
pub image: NftImage,
pub name: Option<String>,
pub collection_name: Option<String>,
pub token_url: Option<String>,
pub token_id: Option<String>,
pub metadata: Option<serde_json::Value>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct NftNode {
token: NftToken,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct NftTokens {
pub nodes: Vec<NftNode>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct NftData {
pub tokens: NftTokens,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct NftResponse {
pub data: NftData,
}
impl NftResponse {
pub async fn request(address: &str) -> Result<NftResponse> {
let query = format!(
r#"
query NFTsForAddress {{
tokens(networks: [{{network: ETHEREUM, chain: MAINNET}}],
pagination: {{limit: 32}},
where: {{ownerAddresses: "{}"}}) {{
nodes {{
token {{
tokenId
tokenUrl
collectionName
name
image {{
url
size
mimeType
}}
metadata
}}
}}
}}
}}
"#,
address
);
let request_body = to_value(serde_json::json!({
"query": query,
"variables": null,
}))?;
let response = Client::new()
.post("https://api.zora.co/graphql")
.json(&request_body)
.send()
.await
.map_err(|err| eyre!("Failed to send request: {}", err))?;
let mut response_body = response.bytes_stream();
let mut response_data = Vec::new();
while let Some(item) = response_body.next().await {
let chunk = item.map_err(|err| eyre!("Failed to read response: {}", err))?;
response_data.extend_from_slice(&chunk);
}
let response_str = String::from_utf8(response_data)
.map_err(|err| eyre!("Failed to convert response to string: {}", err))?;
if DEBUG {
println!("{}", &response_str);
}
let response: NftResponse = serde_json::from_str(&response_str)
.map_err(|err| eyre!("Failed to parse JSON response: {}", err))?;
if DEBUG {
println!("{:#?}", &response.data.tokens.nodes);
}
Ok(response)
}
}
#[cfg(test)]
mod tests {
}