use crate::config::{FileStorageConfig, S3Config};
use crate::utils::error::{GatewayError, Result};
use std::path::{Path, PathBuf};
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing::{debug, info};
use uuid::Uuid;
#[cfg(feature = "s3")]
use aws_sdk_s3 as aws_s3;
#[derive(Debug, Clone)]
pub enum FileStorage {
Local(LocalStorage),
S3(S3Storage),
}
#[derive(Debug, Clone)]
pub struct LocalStorage {
base_path: PathBuf,
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct S3Storage {
bucket: String,
region: String,
#[cfg(feature = "s3")]
client: Option<aws_s3::Client>,
#[cfg(not(feature = "s3"))]
client: Option<()>, }
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct FileMetadata {
pub id: String,
pub filename: String,
pub content_type: String,
pub size: u64,
pub created_at: chrono::DateTime<chrono::Utc>,
pub checksum: String,
}
#[allow(dead_code)]
impl FileStorage {
pub async fn new(config: &FileStorageConfig) -> Result<Self> {
info!("Initializing file storage: {}", config.storage_type);
match config.storage_type.as_str() {
"local" => {
let path = config
.local_path
.as_ref()
.ok_or_else(|| GatewayError::Config("Local path not specified".to_string()))?;
Ok(FileStorage::Local(LocalStorage::new(path).await?))
}
"s3" => {
let s3_config = config.s3.as_ref().ok_or_else(|| {
GatewayError::Config("S3 configuration not specified".to_string())
})?;
Ok(FileStorage::S3(S3Storage::new(s3_config).await?))
}
_ => Err(GatewayError::Config(format!(
"Unsupported storage type: {}",
config.storage_type
))),
}
}
pub async fn store(&self, filename: &str, content: &[u8]) -> Result<String> {
match self {
FileStorage::Local(storage) => storage.store(filename, content).await,
FileStorage::S3(storage) => storage.store(filename, content).await,
}
}
pub async fn get(&self, file_id: &str) -> Result<Vec<u8>> {
match self {
FileStorage::Local(storage) => storage.get(file_id).await,
FileStorage::S3(storage) => storage.get(file_id).await,
}
}
pub async fn delete(&self, file_id: &str) -> Result<()> {
match self {
FileStorage::Local(storage) => storage.delete(file_id).await,
FileStorage::S3(storage) => storage.delete(file_id).await,
}
}
pub async fn exists(&self, file_id: &str) -> Result<bool> {
match self {
FileStorage::Local(storage) => storage.exists(file_id).await,
FileStorage::S3(storage) => storage.exists(file_id).await,
}
}
pub async fn metadata(&self, file_id: &str) -> Result<FileMetadata> {
match self {
FileStorage::Local(storage) => storage.metadata(file_id).await,
FileStorage::S3(storage) => storage.metadata(file_id).await,
}
}
pub async fn list(&self, prefix: Option<&str>, limit: Option<usize>) -> Result<Vec<String>> {
match self {
FileStorage::Local(storage) => storage.list(prefix, limit).await,
FileStorage::S3(storage) => storage.list(prefix, limit).await,
}
}
pub async fn health_check(&self) -> Result<()> {
match self {
FileStorage::Local(storage) => storage.health_check().await,
FileStorage::S3(storage) => storage.health_check().await,
}
}
pub async fn close(&self) -> Result<()> {
match self {
FileStorage::Local(storage) => storage.close().await,
FileStorage::S3(storage) => storage.close().await,
}
}
}
#[allow(dead_code)]
impl LocalStorage {
pub async fn new(base_path: &str) -> Result<Self> {
let path = PathBuf::from(base_path);
if !path.exists() {
fs::create_dir_all(&path).await.map_err(|e| {
GatewayError::FileStorage(format!("Failed to create storage directory: {}", e))
})?;
}
info!("Local file storage initialized at: {}", path.display());
Ok(Self { base_path: path })
}
pub async fn store(&self, filename: &str, content: &[u8]) -> Result<String> {
let file_id = Uuid::new_v4().to_string();
let file_path = self.get_file_path(&file_id);
if let Some(parent) = file_path.parent() {
fs::create_dir_all(parent).await.map_err(|e| {
GatewayError::FileStorage(format!("Failed to create directory: {}", e))
})?;
}
let mut file = fs::File::create(&file_path)
.await
.map_err(|e| GatewayError::FileStorage(format!("Failed to create file: {}", e)))?;
file.write_all(content)
.await
.map_err(|e| GatewayError::FileStorage(format!("Failed to write file: {}", e)))?;
let metadata = FileMetadata {
id: file_id.clone(),
filename: filename.to_string(),
content_type: Self::detect_content_type(filename),
size: content.len() as u64,
created_at: chrono::Utc::now(),
checksum: Self::calculate_checksum(content),
};
self.store_metadata(&file_id, &metadata).await?;
debug!("File stored: {} -> {}", filename, file_id);
Ok(file_id)
}
pub async fn get(&self, file_id: &str) -> Result<Vec<u8>> {
let file_path = self.get_file_path(file_id);
if !file_path.exists() {
return Err(GatewayError::NotFound(format!(
"File not found: {}",
file_id
)));
}
let mut file = fs::File::open(&file_path)
.await
.map_err(|e| GatewayError::FileStorage(format!("Failed to open file: {}", e)))?;
let mut content = Vec::new();
file.read_to_end(&mut content)
.await
.map_err(|e| GatewayError::FileStorage(format!("Failed to read file: {}", e)))?;
Ok(content)
}
pub async fn delete(&self, file_id: &str) -> Result<()> {
let file_path = self.get_file_path(file_id);
let metadata_path = self.get_metadata_path(file_id);
if file_path.exists() {
fs::remove_file(&file_path)
.await
.map_err(|e| GatewayError::FileStorage(format!("Failed to delete file: {}", e)))?;
}
if metadata_path.exists() {
fs::remove_file(&metadata_path).await.map_err(|e| {
GatewayError::FileStorage(format!("Failed to delete metadata: {}", e))
})?;
}
debug!("File deleted: {}", file_id);
Ok(())
}
pub async fn exists(&self, file_id: &str) -> Result<bool> {
let file_path = self.get_file_path(file_id);
Ok(file_path.exists())
}
pub async fn metadata(&self, file_id: &str) -> Result<FileMetadata> {
let metadata_path = self.get_metadata_path(file_id);
if !metadata_path.exists() {
return Err(GatewayError::NotFound(format!(
"File metadata not found: {}",
file_id
)));
}
let content = fs::read_to_string(&metadata_path)
.await
.map_err(|e| GatewayError::FileStorage(format!("Failed to read metadata: {}", e)))?;
let metadata: FileMetadata = serde_json::from_str(&content)
.map_err(|e| GatewayError::FileStorage(format!("Failed to parse metadata: {}", e)))?;
Ok(metadata)
}
pub async fn list(&self, prefix: Option<&str>, limit: Option<usize>) -> Result<Vec<String>> {
let mut files = Vec::new();
let mut entries = fs::read_dir(&self.base_path)
.await
.map_err(|e| GatewayError::FileStorage(format!("Failed to read directory: {}", e)))?;
while let Some(entry) = entries
.next_entry()
.await
.map_err(|e| GatewayError::FileStorage(format!("Failed to read entry: {}", e)))?
{
let file_name = entry.file_name().to_string_lossy().to_string();
if file_name.ends_with(".meta") {
continue;
}
if let Some(prefix) = prefix {
if !file_name.starts_with(prefix) {
continue;
}
}
files.push(file_name);
if let Some(limit) = limit {
if files.len() >= limit {
break;
}
}
}
Ok(files)
}
pub async fn health_check(&self) -> Result<()> {
if !self.base_path.exists() {
return Err(GatewayError::FileStorage(
"Storage directory does not exist".to_string(),
));
}
let test_file = self.base_path.join(".health_check");
fs::write(&test_file, b"health_check")
.await
.map_err(|e| GatewayError::FileStorage(format!("Storage not writable: {}", e)))?;
let _ = fs::remove_file(&test_file).await;
Ok(())
}
pub async fn close(&self) -> Result<()> {
Ok(())
}
fn get_file_path(&self, file_id: &str) -> PathBuf {
let subdir = &file_id[..2.min(file_id.len())];
self.base_path.join(subdir).join(file_id)
}
fn get_metadata_path(&self, file_id: &str) -> PathBuf {
let subdir = &file_id[..2.min(file_id.len())];
self.base_path
.join(subdir)
.join(format!("{}.meta", file_id))
}
async fn store_metadata(&self, file_id: &str, metadata: &FileMetadata) -> Result<()> {
let metadata_path = self.get_metadata_path(file_id);
if let Some(parent) = metadata_path.parent() {
fs::create_dir_all(parent).await.map_err(|e| {
GatewayError::FileStorage(format!("Failed to create metadata directory: {}", e))
})?;
}
let content = serde_json::to_string_pretty(metadata).map_err(|e| {
GatewayError::FileStorage(format!("Failed to serialize metadata: {}", e))
})?;
fs::write(&metadata_path, content)
.await
.map_err(|e| GatewayError::FileStorage(format!("Failed to write metadata: {}", e)))?;
Ok(())
}
fn detect_content_type(filename: &str) -> String {
match Path::new(filename).extension().and_then(|ext| ext.to_str()) {
Some("txt") => "text/plain".to_string(),
Some("json") => "application/json".to_string(),
Some("xml") => "application/xml".to_string(),
Some("html") => "text/html".to_string(),
Some("css") => "text/css".to_string(),
Some("js") => "application/javascript".to_string(),
Some("png") => "image/png".to_string(),
Some("jpg") | Some("jpeg") => "image/jpeg".to_string(),
Some("gif") => "image/gif".to_string(),
Some("pdf") => "application/pdf".to_string(),
Some("zip") => "application/zip".to_string(),
_ => "application/octet-stream".to_string(),
}
}
fn calculate_checksum(content: &[u8]) -> String {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(content);
hex::encode(hasher.finalize())
}
}
#[allow(dead_code)]
impl S3Storage {
pub async fn new(config: &S3Config) -> Result<Self> {
info!(
"S3 file storage initialized: bucket={}, region={}",
config.bucket, config.region
);
#[cfg(feature = "s3")]
{
Ok(Self {
bucket: config.bucket.clone(),
region: config.region.clone(),
client: None, })
}
#[cfg(not(feature = "s3"))]
{
Ok(Self {
bucket: config.bucket.clone(),
region: config.region.clone(),
client: None,
})
}
}
pub async fn store(&self, _filename: &str, _content: &[u8]) -> Result<String> {
Err(GatewayError::FileStorage(
"S3 storage not implemented yet".to_string(),
))
}
pub async fn get(&self, _file_id: &str) -> Result<Vec<u8>> {
Err(GatewayError::FileStorage(
"S3 storage not implemented yet".to_string(),
))
}
pub async fn delete(&self, _file_id: &str) -> Result<()> {
Err(GatewayError::FileStorage(
"S3 storage not implemented yet".to_string(),
))
}
pub async fn exists(&self, _file_id: &str) -> Result<bool> {
Err(GatewayError::FileStorage(
"S3 storage not implemented yet".to_string(),
))
}
pub async fn metadata(&self, _file_id: &str) -> Result<FileMetadata> {
Err(GatewayError::FileStorage(
"S3 storage not implemented yet".to_string(),
))
}
pub async fn list(&self, _prefix: Option<&str>, _limit: Option<usize>) -> Result<Vec<String>> {
Err(GatewayError::FileStorage(
"S3 storage not implemented yet".to_string(),
))
}
pub async fn health_check(&self) -> Result<()> {
Err(GatewayError::FileStorage(
"S3 storage not implemented yet".to_string(),
))
}
pub async fn close(&self) -> Result<()> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_local_storage() {
let temp_dir = TempDir::new().unwrap();
let storage = LocalStorage::new(temp_dir.path().to_str().unwrap())
.await
.unwrap();
let content = b"Hello, World!";
let file_id = storage.store("test.txt", content).await.unwrap();
assert!(!file_id.is_empty());
assert!(storage.exists(&file_id).await.unwrap());
let retrieved = storage.get(&file_id).await.unwrap();
assert_eq!(retrieved, content);
let metadata = storage.metadata(&file_id).await.unwrap();
assert_eq!(metadata.filename, "test.txt");
assert_eq!(metadata.size, content.len() as u64);
storage.delete(&file_id).await.unwrap();
assert!(!storage.exists(&file_id).await.unwrap());
}
#[test]
fn test_content_type_detection() {
assert_eq!(LocalStorage::detect_content_type("test.txt"), "text/plain");
assert_eq!(
LocalStorage::detect_content_type("data.json"),
"application/json"
);
assert_eq!(LocalStorage::detect_content_type("image.png"), "image/png");
assert_eq!(
LocalStorage::detect_content_type("unknown"),
"application/octet-stream"
);
}
}