Skip to main content

jax_daemon/http_server/api/v0/bucket/
add.rs

1use axum::extract::{Multipart, State};
2use axum::response::{IntoResponse, Response};
3use reqwest::{Client, RequestBuilder, Url};
4use serde::{Deserialize, Serialize};
5use std::io::Cursor;
6use std::path::PathBuf;
7use uuid::Uuid;
8
9use common::prelude::{Link, MountError};
10
11use crate::http_server::api::client::ApiRequest;
12use crate::ServiceState;
13
14#[derive(Debug, Clone, Serialize, Deserialize, clap::Args)]
15pub struct AddRequest {
16    /// Bucket ID to add file to
17    #[arg(long)]
18    pub bucket_id: Uuid,
19
20    /// Path in bucket where file should be mounted
21    #[arg(long)]
22    pub mount_path: String,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct FileUploadResult {
27    pub mount_path: String,
28    pub mime_type: String,
29    pub size: usize,
30    pub success: bool,
31    pub error: Option<String>,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct AddResponse {
36    pub bucket_link: Link,
37    pub files: Vec<FileUploadResult>,
38    pub total_files: usize,
39    pub successful_files: usize,
40    pub failed_files: usize,
41}
42
43pub async fn handler(
44    State(state): State<ServiceState>,
45    mut multipart: Multipart,
46) -> Result<impl IntoResponse, AddError> {
47    let mut bucket_id: Option<Uuid> = None;
48    let mut base_path: Option<String> = None;
49    let mut files: Vec<(String, Vec<u8>)> = Vec::new();
50
51    // Parse multipart form data
52    while let Some(field) = multipart.next_field().await.map_err(|e| {
53        tracing::error!("Multipart parsing error: {}", e);
54        AddError::MultipartError(e.to_string())
55    })? {
56        let field_name = field.name().unwrap_or("").to_string();
57
58        match field_name.as_str() {
59            "bucket_id" => {
60                let text = field.text().await.map_err(|e| {
61                    tracing::error!("Error reading bucket_id field: {}", e);
62                    AddError::MultipartError(e.to_string())
63                })?;
64                bucket_id = Some(Uuid::parse_str(&text).map_err(|e| {
65                    tracing::error!("Invalid bucket_id format: {}", e);
66                    AddError::InvalidRequest("Invalid bucket_id".into())
67                })?);
68                tracing::info!("Parsed bucket_id: {}", bucket_id.unwrap());
69            }
70            "mount_path" => {
71                base_path = Some(field.text().await.map_err(|e| {
72                    tracing::error!("Error reading mount_path field: {}", e);
73                    AddError::MultipartError(e.to_string())
74                })?);
75            }
76            "file" | "files" => {
77                // Get filename from the field
78                let filename = field
79                    .file_name()
80                    .map(|s| s.to_string())
81                    .unwrap_or_else(|| "unnamed".to_string());
82
83                tracing::info!("Reading file: {}", filename);
84                let file_data = field
85                    .bytes()
86                    .await
87                    .map_err(|e| {
88                        tracing::error!("Error reading file data for {}: {}", filename, e);
89                        AddError::MultipartError(e.to_string())
90                    })?
91                    .to_vec();
92
93                files.push((filename, file_data));
94            }
95            _ => {
96                tracing::warn!("Ignoring unknown field: {}", field_name);
97            }
98        }
99    }
100
101    let bucket_id =
102        bucket_id.ok_or_else(|| AddError::InvalidRequest("bucket_id is required".into()))?;
103    let base_path =
104        base_path.ok_or_else(|| AddError::InvalidRequest("mount_path is required".into()))?;
105
106    if files.is_empty() {
107        return Err(AddError::InvalidRequest(
108            "At least one file is required".into(),
109        ));
110    }
111
112    tracing::info!(
113        "Uploading {} file(s) to bucket {} at path {}",
114        files.len(),
115        bucket_id,
116        base_path
117    );
118
119    // Load mount at current head
120    tracing::info!("Loading mount for bucket {}", bucket_id);
121    let mut mount = state.peer().mount(bucket_id).await.map_err(|e| {
122        tracing::error!("Failed to load mount for bucket {}: {}", bucket_id, e);
123        e
124    })?;
125
126    let mut results = Vec::new();
127    let mut successful = 0;
128    let mut failed = 0;
129
130    // Process each file
131    tracing::info!("Processing {} files", files.len());
132    for (idx, (filename, file_data)) in files.iter().enumerate() {
133        tracing::info!("Processing file {}/{}: {}", idx + 1, files.len(), filename);
134
135        // Construct full path
136        let full_path = if base_path == "/" {
137            format!("/{}", filename)
138        } else {
139            format!("{}/{}", base_path.trim_end_matches('/'), filename)
140        };
141        tracing::info!("Full path: {}", full_path);
142
143        let mount_path_buf = PathBuf::from(&full_path);
144
145        // Validate mount path
146        if !mount_path_buf.is_absolute() {
147            tracing::warn!("Path is not absolute: {}", full_path);
148            results.push(FileUploadResult {
149                mount_path: full_path.clone(),
150                mime_type: String::new(),
151                size: file_data.len(),
152                success: false,
153                error: Some("Mount path must be absolute".to_string()),
154            });
155            failed += 1;
156            continue;
157        }
158
159        // Detect MIME type from file extension
160        let mime_type = mime_guess::from_path(&mount_path_buf)
161            .first_or_octet_stream()
162            .to_string();
163
164        let file_size = file_data.len();
165
166        // Try to add file to mount
167        match mount
168            .add(&mount_path_buf, Cursor::new(file_data.clone()))
169            .await
170        {
171            Ok(_) => {
172                tracing::info!(
173                    "✓ Added file {} ({} bytes, {})",
174                    full_path,
175                    file_size,
176                    mime_type
177                );
178                results.push(FileUploadResult {
179                    mount_path: full_path,
180                    mime_type,
181                    size: file_size,
182                    success: true,
183                    error: None,
184                });
185                successful += 1;
186            }
187            Err(e) => {
188                tracing::error!("✗ Failed to add file {}: {}", full_path, e);
189                results.push(FileUploadResult {
190                    mount_path: full_path,
191                    mime_type,
192                    size: file_size,
193                    success: false,
194                    error: Some(e.to_string()),
195                });
196                failed += 1;
197            }
198        }
199    }
200
201    let bucket_link = if successful > 0 {
202        tracing::info!("Saving mount (at least one file succeeded)");
203        state.peer().save_mount(&mount, None).await.map_err(|e| {
204            tracing::error!("Failed to save mount: {}", e);
205            tracing::error!("Error details: {:?}", e);
206            e
207        })?
208    } else {
209        tracing::error!("All files failed to upload");
210        return Err(AddError::InvalidRequest(
211            "All files failed to upload".into(),
212        ));
213    };
214
215    tracing::info!("Bucket link: {}", bucket_link);
216
217    Ok((
218        http::StatusCode::OK,
219        axum::Json(AddResponse {
220            bucket_link,
221            files: results,
222            total_files: successful + failed,
223            successful_files: successful,
224            failed_files: failed,
225        }),
226    )
227        .into_response())
228}
229
230#[derive(Debug, thiserror::Error)]
231pub enum AddError {
232    #[error("Invalid request: {0}")]
233    InvalidRequest(String),
234    #[error("Multipart error: {0}")]
235    MultipartError(String),
236    #[error("Mount error: {0}")]
237    Mount(#[from] MountError),
238}
239
240impl IntoResponse for AddError {
241    fn into_response(self) -> Response {
242        match self {
243            AddError::InvalidRequest(msg) | AddError::MultipartError(msg) => (
244                http::StatusCode::BAD_REQUEST,
245                format!("Bad request: {}", msg),
246            )
247                .into_response(),
248            AddError::Mount(_) => (
249                http::StatusCode::INTERNAL_SERVER_ERROR,
250                "Unexpected error".to_string(),
251            )
252                .into_response(),
253        }
254    }
255}
256
257/// Client-side request for adding a file via multipart upload.
258/// Wraps file data as a cursor for the ApiRequest trait.
259pub struct AddFileRequest {
260    pub bucket_id: Uuid,
261    pub mount_path: String,
262    pub filename: String,
263    pub data: Vec<u8>,
264}
265
266impl ApiRequest for AddFileRequest {
267    type Response = AddResponse;
268
269    fn build_request(self, base_url: &Url, client: &Client) -> RequestBuilder {
270        let full_url = base_url.join("/api/v0/bucket/add").unwrap();
271        let form = reqwest::multipart::Form::new()
272            .text("bucket_id", self.bucket_id.to_string())
273            .text("mount_path", self.mount_path)
274            .part(
275                "file",
276                reqwest::multipart::Part::bytes(self.data).file_name(self.filename),
277            );
278        client.post(full_url).multipart(form)
279    }
280}