use crate::bluesky::did::DidResolver;
use crate::error::AppError;
use futures::StreamExt;
use reqwest::Client;
use std::path::PathBuf;
use std::time::Duration;
use tracing::{debug, info};
pub struct RepositoryProvider {
client: Client,
cache_dir: PathBuf,
did_resolver: DidResolver,
}
impl RepositoryProvider {
pub fn new() -> Result<Self, AppError> {
let client = Client::builder()
.timeout(Duration::from_secs(30))
.build()
.map_err(|e| AppError::HttpClientInitialization(e.to_string()))?;
let cache_dir = dirs::cache_dir()
.unwrap_or_else(std::env::temp_dir)
.join("autoreply")
.join("repos");
std::fs::create_dir_all(&cache_dir)
.map_err(|e| AppError::CacheError(format!("Failed to create cache directory: {}", e)))?;
let did_resolver = DidResolver::new();
Ok(Self {
client,
cache_dir,
did_resolver,
})
}
async fn fetch_repo_car(&self, did: &str) -> Result<PathBuf, AppError> {
let pds_endpoint = self.did_resolver.discover_pds(did).await?.ok_or_else(|| {
AppError::DidResolveFailed(format!("Could not determine PDS for DID {}", did))
})?;
let url = format!("{}/xrpc/com.atproto.sync.getRepo?did={}", pds_endpoint, did);
debug!("Fetching repo from URL: {}", url);
let cache_filename = format!("{}.car", did.replace(':', "_"));
let final_path = self.cache_dir.join(&cache_filename);
if final_path.exists() {
info!("Using cached repo for {}", did);
return Ok(final_path);
}
let temp_filename = format!("{}.tmp.{}", cache_filename, std::process::id());
let temp_path = self.cache_dir.join(&temp_filename);
let response = self.client
.get(&url)
.send()
.await
.map_err(|e| AppError::NetworkError(e.to_string()))?;
if !response.status().is_success() {
return Err(AppError::NetworkError(format!(
"Failed to fetch repo: {} {}",
response.status(),
response.text().await.unwrap_or_default()
)));
}
info!(
"Streaming repo for {} ({} bytes) to {}",
did,
response.content_length().unwrap_or(0),
temp_path.display()
);
let mut temp_file = tokio::fs::File::create(&temp_path)
.await
.map_err(|e| AppError::CacheError(format!("Failed to create temp file: {}", e)))?;
let mut stream = response.bytes_stream();
while let Some(chunk) = stream.next().await {
let chunk = chunk.map_err(|e| AppError::NetworkError(e.to_string()))?;
tokio::io::AsyncWriteExt::write_all(&mut temp_file, &chunk)
.await
.map_err(|e| AppError::CacheError(format!("Failed to write to temp file: {}", e)))?;
}
tokio::io::AsyncWriteExt::flush(&mut temp_file)
.await
.map_err(|e| AppError::CacheError(format!("Failed to flush temp file: {}", e)))?;
temp_file.sync_all()
.await
.map_err(|e| AppError::CacheError(format!("Failed to fsync temp file: {}", e)))?;
drop(temp_file);
std::fs::rename(&temp_path, &final_path)
.map_err(|e| AppError::CacheError(format!("Failed to atomically rename temp file: {}", e)))?;
info!("Successfully cached repo for {} at {}", did, final_path.display());
Ok(final_path)
}
pub async fn records(&self, did: &str) -> Result<crate::car::CarRecords, AppError> {
let car_file_path = self.fetch_repo_car(did).await?;
let car_bytes = tokio::fs::read(&car_file_path).await
.map_err(|e| AppError::CacheError(format!("Failed to read CAR file: {}", e)))?;
crate::car::CarRecords::from_bytes(car_bytes)
.map_err(|e| AppError::RepoParseFailed(format!("Failed to create CAR iterator: {}", e)))
}
}
impl Default for RepositoryProvider {
fn default() -> Self {
Self::new().expect("Failed to create default RepositoryProvider")
}
}