use crate::log_feature;
use crate::logging::features::LogFeature;
use crate::storage::UploadStorage;
use actix_multipart::Multipart;
use actix_web::HttpResponse;
use futures_util::StreamExt;
use serde_json::json;
use std::path::PathBuf;
#[cfg(feature = "aws-backend")]
use tokio::fs;
#[derive(Debug)]
pub struct UploadFormData {
pub file_path: PathBuf,
pub original_filename: String,
pub auto_execute: bool,
pub trust_distance: u32,
pub pub_key: String,
pub already_exists: bool,
}
pub async fn parse_multipart(
mut payload: Multipart,
upload_storage: &UploadStorage,
) -> Result<UploadFormData, HttpResponse> {
let mut file_path: Option<PathBuf> = None;
let mut original_filename: Option<String> = None;
let mut already_exists = false;
let mut auto_execute = true;
let mut trust_distance = 0;
let mut pub_key = "default".to_string();
#[cfg(feature = "aws-backend")]
let mut s3_file_path: Option<String> = None;
while let Some(item) = payload.next().await {
let mut field = match item {
Ok(field) => field,
Err(e) => {
log_feature!(
LogFeature::Ingestion,
error,
"Failed to read multipart field: {}",
e
);
return Err(HttpResponse::BadRequest().json(json!({
"success": false,
"error": format!("Failed to read multipart data: {}", e)
})));
}
};
let field_name = field
.content_disposition()
.get_name()
.map(|s| s.to_string());
match field_name.as_deref() {
Some("file") => {
let (path, filename, exists) = save_uploaded_file(field, upload_storage).await?;
file_path = Some(path);
original_filename = Some(filename);
already_exists = exists;
}
#[cfg(feature = "aws-backend")]
Some("s3FilePath") => {
s3_file_path = parse_field_as_string(&mut field).await;
}
Some("autoExecute") => {
auto_execute = parse_field_as_bool(&mut field).await.unwrap_or(true);
}
Some("trustDistance") => {
trust_distance = parse_field_as_u32(&mut field).await.unwrap_or(0);
}
Some("pubKey") => {
pub_key = parse_field_as_string(&mut field)
.await
.unwrap_or_else(|| "default".to_string());
}
_ => {}
}
}
#[cfg(feature = "aws-backend")]
if let Some(s3_path) = s3_file_path {
if file_path.is_some() {
log_feature!(
LogFeature::Ingestion,
error,
"Both file and s3FilePath provided - only one is allowed"
);
return Err(HttpResponse::BadRequest().json(json!({
"success": false,
"error": "Cannot provide both 'file' and 's3FilePath' - use one or the other"
})));
}
log_feature!(
LogFeature::Ingestion,
info,
"Processing S3 file path: {}",
s3_path
);
let (path, filename) = handle_s3_file_path(&s3_path, upload_storage).await?;
file_path = Some(path);
original_filename = Some(filename);
already_exists = false; }
let file_path = match file_path {
Some(path) => path,
None => {
log_feature!(LogFeature::Ingestion, error, "No file provided in upload");
return Err(HttpResponse::BadRequest().json(json!({
"success": false,
"error": "No file provided"
})));
}
};
let original_filename = original_filename.unwrap_or_else(|| "unknown".to_string());
Ok(UploadFormData {
file_path,
original_filename,
auto_execute,
trust_distance,
pub_key,
already_exists,
})
}
async fn save_uploaded_file(
mut field: actix_multipart::Field,
upload_storage: &UploadStorage,
) -> Result<(PathBuf, String, bool), HttpResponse> {
use sha2::{Digest, Sha256};
let filename = field
.content_disposition()
.get_filename()
.unwrap_or("uploaded_file")
.to_string();
let mut hasher = Sha256::new();
let mut file_data = Vec::new();
while let Some(chunk) = field.next().await {
let data = match chunk {
Ok(data) => data,
Err(e) => {
log_feature!(
LogFeature::Ingestion,
error,
"Failed to read file chunk: {}",
e
);
return Err(HttpResponse::InternalServerError().json(json!({
"success": false,
"error": format!("Failed to read file: {}", e)
})));
}
};
hasher.update(&data);
file_data.extend_from_slice(&data);
}
let hash_result = hasher.finalize();
let hash_hex = format!("{:x}", hash_result);
let short_hash = &hash_hex[..16]; let unique_filename = format!("{}_{}", short_hash, &filename);
let (storage_path, already_exists) = match upload_storage
.save_file_if_not_exists(&unique_filename, &file_data, None)
.await
{
Ok((path, exists)) => (path, exists),
Err(e) => {
log_feature!(LogFeature::Ingestion, error, "Failed to save file: {}", e);
return Err(HttpResponse::InternalServerError().json(json!({
"success": false,
"error": format!("Failed to save file: {}", e)
})));
}
};
if already_exists {
let filepath = storage_path;
log_feature!(
LogFeature::Ingestion,
info,
"File already exists (duplicate upload): {} at {}",
unique_filename,
upload_storage.get_display_path(&unique_filename, None)
);
return Ok((filepath, unique_filename, true));
}
let filepath = match upload_storage {
UploadStorage::Local { .. } => {
log_feature!(
LogFeature::Ingestion,
info,
"File saved to local storage: {} at {}",
unique_filename,
upload_storage.get_display_path(&unique_filename, None)
);
storage_path
}
#[cfg(feature = "aws-backend")]
UploadStorage::S3 { .. } => {
let temp_path = std::env::temp_dir().join(&unique_filename);
if let Err(e) = fs::write(&temp_path, &file_data).await {
log_feature!(
LogFeature::Ingestion,
error,
"Failed to write S3-uploaded file to /tmp: {}",
e
);
return Err(HttpResponse::InternalServerError().json(json!({
"success": false,
"error": format!("Failed to write file to temp directory: {}", e)
})));
}
log_feature!(
LogFeature::Ingestion,
info,
"File saved to S3: {} and copied to temp for processing",
upload_storage.get_display_path(&unique_filename, None)
);
temp_path
}
};
log_feature!(
LogFeature::Ingestion,
info,
"File ready for processing (new upload): {}",
unique_filename
);
Ok((filepath, unique_filename, false))
}
async fn parse_field_as_bool(field: &mut actix_multipart::Field) -> Option<bool> {
let mut bytes = Vec::new();
while let Some(chunk) = field.next().await {
if let Ok(data) = chunk {
bytes.extend_from_slice(&data);
}
}
String::from_utf8(bytes).ok()?.parse().ok()
}
async fn parse_field_as_u32(field: &mut actix_multipart::Field) -> Option<u32> {
let mut bytes = Vec::new();
while let Some(chunk) = field.next().await {
if let Ok(data) = chunk {
bytes.extend_from_slice(&data);
}
}
String::from_utf8(bytes).ok()?.parse().ok()
}
async fn parse_field_as_string(field: &mut actix_multipart::Field) -> Option<String> {
let mut bytes = Vec::new();
while let Some(chunk) = field.next().await {
if let Ok(data) = chunk {
bytes.extend_from_slice(&data);
}
}
String::from_utf8(bytes).ok()
}
#[cfg(feature = "aws-backend")]
async fn handle_s3_file_path(
s3_path: &str,
upload_storage: &UploadStorage,
) -> Result<(PathBuf, String), HttpResponse> {
if !s3_path.starts_with("s3://") {
log_feature!(
LogFeature::Ingestion,
error,
"Invalid S3 path format: {}",
s3_path
);
return Err(HttpResponse::BadRequest().json(json!({
"success": false,
"error": format!("Invalid S3 path format. Expected 's3://bucket/key', got: {}", s3_path)
})));
}
let path_without_prefix = &s3_path[5..]; let parts: Vec<&str> = path_without_prefix.splitn(2, '/').collect();
if parts.len() != 2 {
log_feature!(
LogFeature::Ingestion,
error,
"Invalid S3 path structure: {}",
s3_path
);
return Err(HttpResponse::BadRequest().json(json!({
"success": false,
"error": format!("Invalid S3 path. Expected 's3://bucket/key', got: {}", s3_path)
})));
}
let bucket = parts[0];
let key = parts[1];
let filename = key.rsplit('/').next().unwrap_or(key).to_string();
log_feature!(
LogFeature::Ingestion,
info,
"Downloading S3 file: bucket={}, key={}, filename={}",
bucket,
key,
filename
);
let file_data = match upload_storage.download_from_s3_path(bucket, key).await {
Ok(data) => data,
Err(e) => {
log_feature!(
LogFeature::Ingestion,
error,
"Failed to download S3 file: {}",
e
);
return Err(HttpResponse::InternalServerError().json(json!({
"success": false,
"error": format!("Failed to download S3 file: {}", e)
})));
}
};
let temp_path = std::env::temp_dir().join(&filename);
if let Err(e) = fs::write(&temp_path, &file_data).await {
log_feature!(
LogFeature::Ingestion,
error,
"Failed to write S3 file to /tmp: {}",
e
);
return Err(HttpResponse::InternalServerError().json(json!({
"success": false,
"error": format!("Failed to write file to temp directory: {}", e)
})));
}
log_feature!(
LogFeature::Ingestion,
info,
"S3 file downloaded to /tmp for processing: {:?}",
temp_path
);
Ok((temp_path, filename))
}
#[cfg(test)]
mod tests {
use sha2::{Digest, Sha256};
#[test]
fn test_unique_filename_format() {
let test_content = b"test file content";
let mut hasher = Sha256::new();
hasher.update(test_content);
let hash_result = hasher.finalize();
let hash_hex = format!("{:x}", hash_result);
let short_hash = &hash_hex[..16];
let original = "tweets.js";
let unique = format!("{}_{}", short_hash, original);
assert!(unique.contains('_'));
assert!(unique.ends_with("tweets.js"));
let parts: Vec<&str> = unique.splitn(2, '_').collect();
assert_eq!(parts.len(), 2);
assert_eq!(parts[0].len(), 16); assert_eq!(parts[1], original);
}
#[test]
fn test_hash_consistency() {
let content = b"identical content";
let mut hasher1 = Sha256::new();
hasher1.update(content);
let hash1 = format!("{:x}", hasher1.finalize());
let mut hasher2 = Sha256::new();
hasher2.update(content);
let hash2 = format!("{:x}", hasher2.finalize());
assert_eq!(hash1, hash2);
}
#[test]
fn test_hash_uniqueness() {
let content1 = b"content one";
let content2 = b"content two";
let mut hasher1 = Sha256::new();
hasher1.update(content1);
let hash1 = format!("{:x}", hasher1.finalize());
let mut hasher2 = Sha256::new();
hasher2.update(content2);
let hash2 = format!("{:x}", hasher2.finalize());
assert_ne!(hash1, hash2);
}
}