Skip to main content

rusmes_jmap/
blob.rs

1//! Blob upload/download endpoints for JMAP
2//!
3//! Implements RFC 8620 Section 6.1 and 6.2:
4//! - `POST /upload/:account_id` — upload blobs
5//! - `GET /download/:account_id/:blob_id/:name` — download blobs
6//! - Blob size limits and validation
7//!
8//! ## Mount Points
9//!
10//! These routes are mounted by [`crate::api::JmapServer::routes_with_auth_and_state`]
11//! behind the [`crate::auth::require_auth`] middleware. Both endpoints require a
12//! valid authenticated session before the handler is invoked.
13//!
14//! Per-account ownership enforcement (verifying that the authenticated
15//! [`crate::types::Principal`] owns the `:account_id` path parameter) is a
16//! follow-up concern and is not yet implemented here.
17
18use axum::{
19    body::Body,
20    extract::{Path, State},
21    http::{header, StatusCode},
22    response::{IntoResponse, Response},
23    routing::{get, post},
24    Router,
25};
26use chrono::{DateTime, Utc};
27use serde::{Deserialize, Serialize};
28use std::collections::HashMap;
29use std::path::PathBuf;
30use std::sync::{Arc, RwLock};
31use uuid::Uuid;
32
33/// Default maximum blob size: 50 MiB
34pub const DEFAULT_MAX_BLOB_SIZE: u64 = 52_428_800;
35
36// ─────────────────────────────────────────────────────────────────────────────
37// BlobMeta
38// ─────────────────────────────────────────────────────────────────────────────
39
40/// Metadata associated with a stored blob.
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct BlobMeta {
43    /// MIME content-type of the blob.
44    pub content_type: String,
45    /// Size in bytes.
46    pub size: u64,
47    /// Account that uploaded this blob.
48    pub account_id: String,
49    /// Upload timestamp (UTC).
50    pub created_at: DateTime<Utc>,
51}
52
53// ─────────────────────────────────────────────────────────────────────────────
54// BlobData (kept for backwards-compatibility with email_advanced.rs)
55// ─────────────────────────────────────────────────────────────────────────────
56
57/// Raw blob data returned by the legacy [`BlobStorage::get`] accessor.
58#[derive(Clone)]
59pub struct BlobData {
60    data: Vec<u8>,
61    content_type: String,
62}
63
64impl BlobData {
65    /// Get the raw blob bytes.
66    pub fn data(&self) -> &[u8] {
67        &self.data
68    }
69
70    /// Get the content type of the blob.
71    pub fn content_type(&self) -> &str {
72        &self.content_type
73    }
74}
75
76// ─────────────────────────────────────────────────────────────────────────────
77// UploadError
78// ─────────────────────────────────────────────────────────────────────────────
79
80/// Errors that can arise during blob upload or download operations.
81#[derive(Debug, thiserror::Error)]
82pub enum UploadError {
83    /// The blob body exceeds the configured size ceiling.
84    #[error("blob too large: {actual} bytes exceeds maximum of {max}")]
85    TooLarge {
86        /// Actual body size in bytes.
87        actual: u64,
88        /// Configured maximum in bytes.
89        max: u64,
90    },
91
92    /// The requested blob was not found.
93    #[error("blob not found: {0}")]
94    NotFound(String),
95
96    /// An I/O error while writing/reading from the filesystem backend.
97    #[error("blob I/O error: {0}")]
98    Io(#[from] std::io::Error),
99
100    /// A JSON (de)serialisation failure on the metadata sidecar.
101    #[error("blob metadata error: {0}")]
102    Meta(#[from] serde_json::Error),
103
104    /// The internal RwLock was poisoned.
105    #[error("blob storage lock poisoned")]
106    LockPoisoned,
107}
108
109// ─────────────────────────────────────────────────────────────────────────────
110// BlobBackend (private)
111// ─────────────────────────────────────────────────────────────────────────────
112
113/// Shared type alias for the in-memory blob map used by the `Memory` variant.
114type MemoryBlobMap = Arc<RwLock<HashMap<String, (Vec<u8>, BlobMeta)>>>;
115
116/// Internal storage strategy.
117enum BlobBackend {
118    /// Pure in-memory — blobs are lost on restart (default).
119    Memory { blobs: MemoryBlobMap },
120    /// Filesystem-backed — blobs survive restarts.
121    ///
122    /// Each blob is stored as two files under `<root>/blobs/`:
123    /// - `<blob_id>`          — raw bytes
124    /// - `<blob_id>.meta.json` — JSON-serialised [`BlobMeta`]
125    ///
126    /// An in-memory index (`Arc<RwLock<HashMap<…>>>`) mirrors the on-disk
127    /// state and is rebuilt from `.meta.json` sidecars when the storage is
128    /// opened via [`BlobStorage::new_filesystem`].
129    FileSystem {
130        root: PathBuf,
131        index: Arc<RwLock<HashMap<String, BlobMeta>>>,
132    },
133}
134
135// ─────────────────────────────────────────────────────────────────────────────
136// BlobStorage
137// ─────────────────────────────────────────────────────────────────────────────
138
139/// Content-addressed blob store for JMAP.
140///
141/// Two variants are available:
142///
143/// | Constructor | Persistence | Notes |
144/// |---|---|---|
145/// | [`BlobStorage::new`] | In-memory only | Existing callers unchanged |
146/// | [`BlobStorage::new_filesystem`] | Survives restarts | Requires a writable directory |
147///
148/// Both variants enforce the same [`max_blob_size`](Self::max_blob_size) ceiling and
149/// expose the same public API.
150///
151/// # Clone semantics
152///
153/// [`BlobStorage`] is cheaply cloneable — the inner state (both the `Arc`-wrapped
154/// in-memory map and the on-disk index) is shared between all clones. This
155/// matches the original behaviour.
156#[derive(Clone)]
157pub struct BlobStorage {
158    backend: Arc<BlobBackend>,
159    /// Maximum body size (bytes) accepted by [`Self::upload`].
160    pub max_blob_size: u64,
161}
162
163impl BlobStorage {
164    /// Create a new **in-memory** blob storage (no persistence across restarts).
165    pub fn new() -> Self {
166        Self {
167            backend: Arc::new(BlobBackend::Memory {
168                blobs: Arc::new(RwLock::new(HashMap::new())),
169            }),
170            max_blob_size: DEFAULT_MAX_BLOB_SIZE,
171        }
172    }
173
174    /// Open (or create) a **filesystem-backed** blob storage rooted at `root`.
175    ///
176    /// On first call with an empty directory, the `blobs/` sub-directory is
177    /// created and the in-memory index starts empty.  On subsequent calls the
178    /// index is rebuilt by scanning every `*.meta.json` sidecar in `blobs/`.
179    ///
180    /// # Errors
181    ///
182    /// Returns an error if the directory cannot be created or if a sidecar
183    /// file contains invalid JSON.
184    pub async fn new_filesystem(root: PathBuf) -> Result<Self, UploadError> {
185        let blobs_dir = root.join("blobs");
186        tokio::fs::create_dir_all(&blobs_dir).await?;
187
188        // Rebuild the index from on-disk sidecars.
189        let mut index: HashMap<String, BlobMeta> = HashMap::new();
190
191        let mut read_dir = tokio::fs::read_dir(&blobs_dir).await?;
192        while let Some(entry) = read_dir.next_entry().await? {
193            let path = entry.path();
194            let file_name = match path.file_name().and_then(|n| n.to_str()) {
195                Some(n) => n.to_owned(),
196                None => continue,
197            };
198
199            // Only process ".meta.json" sidecars.
200            if !file_name.ends_with(".meta.json") {
201                continue;
202            }
203
204            // Derive blob_id from sidecar file name.
205            let blob_id = file_name
206                .strip_suffix(".meta.json")
207                .unwrap_or(&file_name)
208                .to_owned();
209
210            let raw = tokio::fs::read(&path).await?;
211            match serde_json::from_slice::<BlobMeta>(&raw) {
212                Ok(meta) => {
213                    index.insert(blob_id, meta);
214                }
215                Err(e) => {
216                    tracing::warn!("Skipping corrupt blob sidecar {:?}: {}", path, e);
217                }
218            }
219        }
220
221        Ok(Self {
222            backend: Arc::new(BlobBackend::FileSystem {
223                root,
224                index: Arc::new(RwLock::new(index)),
225            }),
226            max_blob_size: DEFAULT_MAX_BLOB_SIZE,
227        })
228    }
229
230    /// Override the maximum blob size (bytes).
231    ///
232    /// Returns `self` for builder-style chaining.
233    pub fn with_max_blob_size(mut self, max_bytes: u64) -> Self {
234        self.max_blob_size = max_bytes;
235        self
236    }
237
238    // ──────────────────────────────────────────────────────────────────────
239    // Async API (preferred for production code)
240    // ──────────────────────────────────────────────────────────────────────
241
242    /// Upload a blob and return the generated `blob_id`.
243    ///
244    /// The body is rejected with [`UploadError::TooLarge`] before any bytes
245    /// are written if it exceeds [`Self::max_blob_size`].
246    pub async fn upload(
247        &self,
248        account_id: &str,
249        content_type: &str,
250        body: &[u8],
251    ) -> Result<String, UploadError> {
252        // Size-limit check happens before any I/O.
253        let actual = body.len() as u64;
254        if actual > self.max_blob_size {
255            return Err(UploadError::TooLarge {
256                actual,
257                max: self.max_blob_size,
258            });
259        }
260
261        let blob_id = Uuid::new_v4().to_string();
262        let meta = BlobMeta {
263            content_type: content_type.to_owned(),
264            size: actual,
265            account_id: account_id.to_owned(),
266            created_at: Utc::now(),
267        };
268
269        match self.backend.as_ref() {
270            BlobBackend::Memory { blobs } => {
271                let mut guard = blobs.write().map_err(|_| UploadError::LockPoisoned)?;
272                guard.insert(blob_id.clone(), (body.to_vec(), meta));
273            }
274            BlobBackend::FileSystem { root, index } => {
275                let blobs_dir = root.join("blobs");
276                let tmp_path = blobs_dir.join(format!("{}.tmp", blob_id));
277                let final_path = blobs_dir.join(&blob_id);
278                let meta_path = blobs_dir.join(format!("{}.meta.json", blob_id));
279
280                // Write bytes atomically: temp → rename.
281                tokio::fs::write(&tmp_path, body).await?;
282                tokio::fs::rename(&tmp_path, &final_path).await?;
283
284                // Write metadata sidecar.
285                let meta_bytes = serde_json::to_vec(&meta)?;
286                tokio::fs::write(&meta_path, &meta_bytes).await?;
287
288                // Update in-memory index.
289                let mut guard = index.write().map_err(|_| UploadError::LockPoisoned)?;
290                guard.insert(blob_id.clone(), meta);
291            }
292        }
293
294        Ok(blob_id)
295    }
296
297    /// Download a blob's raw bytes together with its metadata.
298    ///
299    /// Returns `Ok((bytes, meta))` or [`UploadError::NotFound`].
300    pub async fn download(&self, blob_id: &str) -> Result<(Vec<u8>, BlobMeta), UploadError> {
301        match self.backend.as_ref() {
302            BlobBackend::Memory { blobs } => {
303                let guard = blobs.read().map_err(|_| UploadError::LockPoisoned)?;
304                match guard.get(blob_id) {
305                    Some((data, meta)) => Ok((data.clone(), meta.clone())),
306                    None => Err(UploadError::NotFound(blob_id.to_owned())),
307                }
308            }
309            BlobBackend::FileSystem { root, index } => {
310                // Check index first.
311                let meta = {
312                    let guard = index.read().map_err(|_| UploadError::LockPoisoned)?;
313                    guard
314                        .get(blob_id)
315                        .cloned()
316                        .ok_or_else(|| UploadError::NotFound(blob_id.to_owned()))?
317                };
318
319                let blob_path = root.join("blobs").join(blob_id);
320                let data = tokio::fs::read(&blob_path).await?;
321                Ok((data, meta))
322            }
323        }
324    }
325
326    /// Delete a blob by ID.
327    ///
328    /// Returns [`UploadError::NotFound`] if the blob does not exist.
329    pub async fn delete(&self, blob_id: &str) -> Result<(), UploadError> {
330        match self.backend.as_ref() {
331            BlobBackend::Memory { blobs } => {
332                let mut guard = blobs.write().map_err(|_| UploadError::LockPoisoned)?;
333                if guard.remove(blob_id).is_none() {
334                    return Err(UploadError::NotFound(blob_id.to_owned()));
335                }
336            }
337            BlobBackend::FileSystem { root, index } => {
338                {
339                    let mut guard = index.write().map_err(|_| UploadError::LockPoisoned)?;
340                    if guard.remove(blob_id).is_none() {
341                        return Err(UploadError::NotFound(blob_id.to_owned()));
342                    }
343                }
344                let blobs_dir = root.join("blobs");
345                let blob_path = blobs_dir.join(blob_id);
346                let meta_path = blobs_dir.join(format!("{}.meta.json", blob_id));
347                // Best-effort removals — don't fail if file is already gone.
348                let _ = tokio::fs::remove_file(&blob_path).await;
349                let _ = tokio::fs::remove_file(&meta_path).await;
350            }
351        }
352        Ok(())
353    }
354
355    /// Return the number of blobs currently held in this storage instance.
356    pub async fn blob_count(&self) -> Result<usize, UploadError> {
357        match self.backend.as_ref() {
358            BlobBackend::Memory { blobs } => {
359                let guard = blobs.read().map_err(|_| UploadError::LockPoisoned)?;
360                Ok(guard.len())
361            }
362            BlobBackend::FileSystem { index, .. } => {
363                let guard = index.read().map_err(|_| UploadError::LockPoisoned)?;
364                Ok(guard.len())
365            }
366        }
367    }
368
369    // ──────────────────────────────────────────────────────────────────────
370    // Legacy synchronous API (for backwards-compat with email_advanced.rs)
371    // ──────────────────────────────────────────────────────────────────────
372
373    /// Store a blob using the legacy synchronous path.
374    ///
375    /// Only available on the **memory** variant; filesystem callers should
376    /// use [`Self::upload`] instead.  The call is a no-op (blob silently
377    /// dropped) when called on a filesystem-backed store, because this
378    /// method cannot perform async I/O — use [`Self::upload`] in that case.
379    pub fn store(&self, blob_id: String, data: Vec<u8>, content_type: String) {
380        match self.backend.as_ref() {
381            BlobBackend::Memory { blobs } => {
382                let meta = BlobMeta {
383                    content_type: content_type.clone(),
384                    size: data.len() as u64,
385                    account_id: String::new(),
386                    created_at: Utc::now(),
387                };
388                if let Ok(mut guard) = blobs.write() {
389                    guard.insert(blob_id, (data, meta));
390                }
391            }
392            BlobBackend::FileSystem { .. } => {
393                tracing::warn!(
394                    "BlobStorage::store() called on filesystem backend — \
395                     use BlobStorage::upload() for filesystem persistence"
396                );
397            }
398        }
399    }
400
401    /// Retrieve a blob using the legacy synchronous API.
402    pub fn get(&self, blob_id: &str) -> Option<BlobData> {
403        match self.backend.as_ref() {
404            BlobBackend::Memory { blobs } => {
405                let guard = blobs.read().ok()?;
406                guard.get(blob_id).map(|(data, meta)| BlobData {
407                    data: data.clone(),
408                    content_type: meta.content_type.clone(),
409                })
410            }
411            BlobBackend::FileSystem { root, index } => {
412                // Synchronous read from disk — only suitable for small blobs
413                // and test code paths.
414                let meta = {
415                    let guard = index.read().ok()?;
416                    guard.get(blob_id)?.clone()
417                };
418                let blob_path = root.join("blobs").join(blob_id);
419                let data = std::fs::read(&blob_path).ok()?;
420                Some(BlobData {
421                    data,
422                    content_type: meta.content_type,
423                })
424            }
425        }
426    }
427
428    /// Get blob size using the legacy synchronous API.
429    pub fn size(&self, blob_id: &str) -> Option<usize> {
430        match self.backend.as_ref() {
431            BlobBackend::Memory { blobs } => {
432                let guard = blobs.read().ok()?;
433                guard.get(blob_id).map(|(data, _)| data.len())
434            }
435            BlobBackend::FileSystem { index, .. } => {
436                let guard = index.read().ok()?;
437                guard.get(blob_id).map(|m| m.size as usize)
438            }
439        }
440    }
441}
442
443impl Default for BlobStorage {
444    fn default() -> Self {
445        Self::new()
446    }
447}
448
449// ─────────────────────────────────────────────────────────────────────────────
450// HTTP response types
451// ─────────────────────────────────────────────────────────────────────────────
452
453/// Success response for a blob upload (RFC 8620 §6.1).
454#[derive(Debug, Serialize, Deserialize)]
455#[serde(rename_all = "camelCase")]
456pub struct UploadResponse {
457    pub account_id: String,
458    pub blob_id: String,
459    #[serde(rename = "type")]
460    pub content_type: String,
461    pub size: usize,
462}
463
464/// JSON error body for a failed blob upload.
465#[derive(Debug, Serialize)]
466#[serde(rename_all = "camelCase")]
467pub struct UploadErrorBody {
468    #[serde(rename = "type")]
469    pub error_type: String,
470    pub status: u16,
471    #[serde(skip_serializing_if = "Option::is_none")]
472    pub detail: Option<String>,
473}
474
475// ─────────────────────────────────────────────────────────────────────────────
476// Axum routes
477// ─────────────────────────────────────────────────────────────────────────────
478
479/// Create blob router.
480///
481/// Mounts:
482/// - `GET /download/{account_id}/{blob_id}/{name}` — blob download (RFC 8620 §6.2)
483/// - `POST /upload/{account_id}` — blob upload (RFC 8620 §6.2)
484pub fn blob_routes() -> Router<BlobStorage> {
485    Router::new()
486        .route("/download/{account}/{blob}/{name}", get(download_blob))
487        .route("/upload/{account}", post(upload_blob))
488}
489
490/// Download blob endpoint
491async fn download_blob(
492    Path((account, blob_id, name)): Path<(String, String, String)>,
493    State(storage): State<BlobStorage>,
494) -> Response {
495    if account.is_empty() {
496        return (StatusCode::BAD_REQUEST, "Invalid account ID").into_response();
497    }
498
499    match storage.download(&blob_id).await {
500        Ok((data, meta)) => {
501            match Response::builder()
502                .status(StatusCode::OK)
503                .header(header::CONTENT_TYPE, &meta.content_type)
504                .header(
505                    header::CONTENT_DISPOSITION,
506                    format!("attachment; filename=\"{}\"", name),
507                )
508                .header(header::CONTENT_LENGTH, data.len())
509                .body(Body::from(data))
510            {
511                Ok(response) => response,
512                Err(e) => (
513                    StatusCode::INTERNAL_SERVER_ERROR,
514                    format!("Failed to build response: {}", e),
515                )
516                    .into_response(),
517            }
518        }
519        Err(UploadError::NotFound(_)) => (StatusCode::NOT_FOUND, "Blob not found").into_response(),
520        Err(e) => (
521            StatusCode::INTERNAL_SERVER_ERROR,
522            format!("Storage error: {}", e),
523        )
524            .into_response(),
525    }
526}
527
528/// Upload blob endpoint
529async fn upload_blob(
530    Path(account): Path<String>,
531    State(storage): State<BlobStorage>,
532    headers: axum::http::HeaderMap,
533    body: axum::body::Bytes,
534) -> Response {
535    if account.is_empty() {
536        let error = UploadErrorBody {
537            error_type: "urn:ietf:params:jmap:error:invalidArguments".to_string(),
538            status: 400,
539            detail: Some("Invalid account ID".to_string()),
540        };
541        return (StatusCode::BAD_REQUEST, axum::Json(error)).into_response();
542    }
543
544    let content_type = headers
545        .get(header::CONTENT_TYPE)
546        .and_then(|v| v.to_str().ok())
547        .unwrap_or("application/octet-stream")
548        .to_string();
549
550    match storage.upload(&account, &content_type, &body).await {
551        Ok(blob_id) => {
552            let response = UploadResponse {
553                account_id: account,
554                blob_id,
555                content_type,
556                size: body.len(),
557            };
558            (StatusCode::CREATED, axum::Json(response)).into_response()
559        }
560        Err(UploadError::TooLarge { actual, max }) => {
561            let error = UploadErrorBody {
562                error_type: "urn:ietf:params:jmap:error:tooLarge".to_string(),
563                status: 413,
564                detail: Some(format!(
565                    "Blob size {} bytes exceeds maximum of {} bytes",
566                    actual, max
567                )),
568            };
569            (StatusCode::PAYLOAD_TOO_LARGE, axum::Json(error)).into_response()
570        }
571        Err(e) => {
572            let error = UploadErrorBody {
573                error_type: "urn:ietf:params:jmap:error:serverFail".to_string(),
574                status: 500,
575                detail: Some(format!("Upload failed: {}", e)),
576            };
577            (StatusCode::INTERNAL_SERVER_ERROR, axum::Json(error)).into_response()
578        }
579    }
580}
581
582// ─────────────────────────────────────────────────────────────────────────────
583// Content-addressed blob ID helper
584// ─────────────────────────────────────────────────────────────────────────────
585
586/// Compute a stable content-addressed blob ID for JMAP per RFC 8620 §6.2.
587///
588/// Returns the hex-encoded SHA-256 of the given bytes.  This is stable across
589/// process restarts and replicas.  JMAP clients re-discover blob IDs each
590/// session, so switching from the old `format!("blob-{}", id)` scheme to this
591/// content-addressed scheme is non-breaking for compliant clients.
592pub fn compute_blob_id(bytes: &[u8]) -> String {
593    use sha2::{Digest, Sha256};
594    let mut hasher = Sha256::new();
595    hasher.update(bytes);
596    format!("{:x}", hasher.finalize())
597}
598
599// ─────────────────────────────────────────────────────────────────────────────
600// Tests
601// ─────────────────────────────────────────────────────────────────────────────
602
603#[cfg(test)]
604mod tests {
605    use super::*;
606
607    /// Generate a deterministic blob ID from data (SHA-256, 'G'-prefixed).
608    /// Used only in tests; production code uses UUID v4.
609    fn generate_blob_id(data: &[u8]) -> String {
610        use sha2::{Digest, Sha256};
611        let mut hasher = Sha256::new();
612        hasher.update(data);
613        let result = hasher.finalize();
614        format!("G{:x}", result)
615    }
616
617    // ── legacy synchronous memory tests (preserved) ────────────────────────
618
619    #[test]
620    fn test_blob_storage_store_and_get() {
621        let storage = BlobStorage::new();
622        let data = b"test data".to_vec();
623        let blob_id = "blob123".to_string();
624
625        storage.store(blob_id.clone(), data.clone(), "text/plain".to_string());
626
627        let retrieved = storage.get(&blob_id).expect("blob should exist");
628        assert_eq!(retrieved.data(), data.as_slice());
629        assert_eq!(retrieved.content_type(), "text/plain");
630    }
631
632    #[test]
633    fn test_blob_storage_size() {
634        let storage = BlobStorage::new();
635        let data = b"test data".to_vec();
636        let blob_id = "blob123".to_string();
637
638        storage.store(blob_id.clone(), data.clone(), "text/plain".to_string());
639
640        assert_eq!(storage.size(&blob_id), Some(9));
641    }
642
643    #[test]
644    fn test_blob_storage_get_nonexistent() {
645        let storage = BlobStorage::new();
646        assert!(storage.get("nonexistent").is_none());
647    }
648
649    #[test]
650    fn test_generate_blob_id() {
651        let data1 = b"test data";
652        let data2 = b"test data";
653        let data3 = b"different data";
654
655        let id1 = generate_blob_id(data1);
656        let id2 = generate_blob_id(data2);
657        let id3 = generate_blob_id(data3);
658
659        assert_eq!(id1, id2);
660        assert_ne!(id1, id3);
661        assert!(id1.starts_with('G'));
662    }
663
664    #[test]
665    fn test_blob_id_length() {
666        let data = b"test data";
667        let blob_id = generate_blob_id(data);
668        // SHA256 hash is 64 hex chars + 1 for 'G' prefix
669        assert_eq!(blob_id.len(), 65);
670    }
671
672    #[test]
673    fn test_blob_storage_multiple_blobs() {
674        let storage = BlobStorage::new();
675
676        for i in 0..10 {
677            let data = format!("data{}", i).into_bytes();
678            let blob_id = format!("blob{}", i);
679            storage.store(blob_id.clone(), data, "text/plain".to_string());
680        }
681
682        for i in 0..10 {
683            let blob_id = format!("blob{}", i);
684            assert!(storage.get(&blob_id).is_some());
685        }
686    }
687
688    #[test]
689    fn test_blob_storage_overwrite() {
690        let storage = BlobStorage::new();
691        let blob_id = "blob123".to_string();
692
693        storage.store(
694            blob_id.clone(),
695            b"original".to_vec(),
696            "text/plain".to_string(),
697        );
698        storage.store(
699            blob_id.clone(),
700            b"updated".to_vec(),
701            "text/html".to_string(),
702        );
703
704        let retrieved = storage.get(&blob_id).expect("blob should exist");
705        assert_eq!(retrieved.data(), b"updated");
706        assert_eq!(retrieved.content_type(), "text/html");
707    }
708
709    #[test]
710    fn test_blob_storage_empty_data() {
711        let storage = BlobStorage::new();
712        let blob_id = "empty".to_string();
713
714        storage.store(
715            blob_id.clone(),
716            vec![],
717            "application/octet-stream".to_string(),
718        );
719
720        let retrieved = storage.get(&blob_id).expect("blob should exist");
721        assert_eq!(retrieved.data().len(), 0);
722    }
723
724    #[test]
725    fn test_blob_storage_large_data() {
726        let storage = BlobStorage::new();
727        let data = vec![0u8; 1024 * 1024]; // 1MB
728        let blob_id = "large".to_string();
729
730        storage.store(
731            blob_id.clone(),
732            data,
733            "application/octet-stream".to_string(),
734        );
735
736        assert_eq!(storage.size(&blob_id), Some(1024 * 1024));
737    }
738
739    #[test]
740    fn test_upload_error_serialization() {
741        let error = UploadErrorBody {
742            error_type: "urn:ietf:params:jmap:error:tooLarge".to_string(),
743            status: 413,
744            detail: Some("Too large".to_string()),
745        };
746
747        let json = serde_json::to_string(&error).expect("serialization should succeed");
748        assert!(json.contains("tooLarge"));
749        assert!(json.contains("413"));
750    }
751
752    #[test]
753    fn test_upload_response_serialization() {
754        let response = UploadResponse {
755            account_id: "acc1".to_string(),
756            blob_id: "blob123".to_string(),
757            content_type: "image/png".to_string(),
758            size: 1024,
759        };
760
761        let json = serde_json::to_string(&response).expect("serialization should succeed");
762        assert!(json.contains("blob123"));
763        assert!(json.contains("image/png"));
764    }
765
766    #[test]
767    fn test_blob_storage_clone() {
768        let storage1 = BlobStorage::new();
769        storage1.store(
770            "blob1".to_string(),
771            b"data".to_vec(),
772            "text/plain".to_string(),
773        );
774
775        let storage2 = storage1.clone();
776        assert!(storage2.get("blob1").is_some());
777    }
778
779    #[test]
780    fn test_blob_data_clone() {
781        let data1 = BlobData {
782            data: b"test".to_vec(),
783            content_type: "text/plain".to_string(),
784        };
785
786        let data2 = data1.clone();
787        assert_eq!(data1.data(), data2.data());
788        assert_eq!(data1.content_type(), data2.content_type());
789    }
790
791    #[test]
792    fn test_blob_storage_default() {
793        let storage = BlobStorage::default();
794        assert!(storage.get("any").is_none());
795    }
796
797    #[test]
798    fn test_blob_id_uniqueness() {
799        let mut ids = std::collections::HashSet::new();
800
801        for i in 0..100 {
802            let data = format!("unique data {}", i).into_bytes();
803            let id = generate_blob_id(&data);
804            assert!(ids.insert(id), "Duplicate blob ID generated");
805        }
806    }
807
808    #[test]
809    fn test_blob_storage_concurrent_access() {
810        let storage = BlobStorage::new();
811
812        storage.store(
813            "blob1".to_string(),
814            b"data1".to_vec(),
815            "text/plain".to_string(),
816        );
817
818        let storage2 = storage.clone();
819        storage2.store(
820            "blob2".to_string(),
821            b"data2".to_vec(),
822            "text/html".to_string(),
823        );
824
825        assert!(storage.get("blob1").is_some());
826        assert!(storage.get("blob2").is_some());
827        assert!(storage2.get("blob1").is_some());
828        assert!(storage2.get("blob2").is_some());
829    }
830
831    #[test]
832    fn test_blob_storage_size_nonexistent() {
833        let storage = BlobStorage::new();
834        assert_eq!(storage.size("nonexistent"), None);
835    }
836
837    #[test]
838    fn test_blob_id_format() {
839        let data = b"test";
840        let blob_id = generate_blob_id(data);
841        assert!(blob_id.chars().skip(1).all(|c| c.is_ascii_hexdigit()));
842    }
843
844    #[test]
845    fn test_upload_error_without_detail() {
846        let error = UploadErrorBody {
847            error_type: "urn:ietf:params:jmap:error:serverFail".to_string(),
848            status: 500,
849            detail: None,
850        };
851
852        let json = serde_json::to_string(&error).expect("serialization should succeed");
853        assert!(!json.contains("detail"));
854    }
855
856    #[test]
857    fn test_blob_id_deterministic() {
858        let data = b"consistent data";
859        let id1 = generate_blob_id(data);
860        let id2 = generate_blob_id(data);
861        let id3 = generate_blob_id(data);
862
863        assert_eq!(id1, id2);
864        assert_eq!(id2, id3);
865    }
866
867    #[test]
868    fn test_max_blob_size_constant() {
869        assert_eq!(DEFAULT_MAX_BLOB_SIZE, 50 * 1024 * 1024);
870    }
871
872    // ── new async memory tests ─────────────────────────────────────────────
873
874    /// Upload to memory, download, verify bytes match.
875    #[tokio::test]
876    async fn test_memory_roundtrip() {
877        let storage = BlobStorage::new();
878        let payload = b"hello, JMAP blob world!";
879
880        let blob_id = storage
881            .upload("account-alice", "text/plain", payload)
882            .await
883            .expect("upload should succeed");
884
885        let (data, meta) = storage
886            .download(&blob_id)
887            .await
888            .expect("download should succeed");
889
890        assert_eq!(data.as_slice(), payload);
891        assert_eq!(meta.content_type, "text/plain");
892        assert_eq!(meta.account_id, "account-alice");
893        assert_eq!(meta.size, payload.len() as u64);
894    }
895
896    /// Upload 49 MiB — must succeed with the default 50 MiB limit.
897    #[tokio::test]
898    async fn test_size_limit_accepted() {
899        let storage = BlobStorage::new();
900        // 49 MiB — just under the 50 MiB default ceiling.
901        let payload = vec![0xABu8; 49 * 1024 * 1024];
902
903        let result = storage
904            .upload("account-alice", "application/octet-stream", &payload)
905            .await;
906        assert!(
907            result.is_ok(),
908            "49 MiB upload should succeed, got {:?}",
909            result
910        );
911    }
912
913    /// Upload 51 MiB — must be rejected with TooLarge.
914    #[tokio::test]
915    async fn test_size_limit_rejected() {
916        let storage = BlobStorage::new();
917        // 51 MiB — over the 50 MiB default ceiling.
918        let payload = vec![0xFFu8; 51 * 1024 * 1024];
919
920        let err = storage
921            .upload("account-alice", "application/octet-stream", &payload)
922            .await
923            .expect_err("51 MiB upload should be rejected");
924
925        match err {
926            UploadError::TooLarge { actual, max } => {
927                assert_eq!(actual, 51 * 1024 * 1024);
928                assert_eq!(max, DEFAULT_MAX_BLOB_SIZE);
929            }
930            other => panic!("expected TooLarge, got {:?}", other),
931        }
932    }
933
934    /// Custom size limit: upload right at the boundary.
935    #[tokio::test]
936    async fn test_custom_size_limit() {
937        let storage = BlobStorage::new().with_max_blob_size(1024);
938        let payload_ok = vec![0u8; 1024];
939        let payload_bad = vec![0u8; 1025];
940
941        assert!(
942            storage
943                .upload("acc", "application/octet-stream", &payload_ok)
944                .await
945                .is_ok(),
946            "Exactly-at-limit upload should succeed"
947        );
948        let err = storage
949            .upload("acc", "application/octet-stream", &payload_bad)
950            .await
951            .expect_err("Over-limit upload should fail");
952        assert!(matches!(err, UploadError::TooLarge { .. }));
953    }
954
955    /// Delete removes the blob from the store.
956    #[tokio::test]
957    async fn test_memory_delete() {
958        let storage = BlobStorage::new();
959        let blob_id = storage
960            .upload("acc", "text/plain", b"delete me")
961            .await
962            .expect("upload should succeed");
963
964        storage
965            .delete(&blob_id)
966            .await
967            .expect("delete should succeed");
968
969        let err = storage
970            .download(&blob_id)
971            .await
972            .expect_err("download after delete should fail");
973        assert!(matches!(err, UploadError::NotFound(_)));
974    }
975
976    // ── filesystem backend tests ───────────────────────────────────────────
977
978    /// Upload to filesystem, drop storage, open a new instance, download → same bytes.
979    #[tokio::test]
980    async fn test_filesystem_roundtrip() {
981        let tmp = std::env::temp_dir().join(format!("rusmes_blob_roundtrip_{}", Uuid::new_v4()));
982        let payload = b"filesystem roundtrip payload";
983
984        let blob_id = {
985            let storage = BlobStorage::new_filesystem(tmp.clone())
986                .await
987                .expect("new_filesystem should succeed");
988            storage
989                .upload("account-bob", "text/plain", payload)
990                .await
991                .expect("upload should succeed")
992        }; // storage is dropped here
993
994        // Re-open the same root; index must be rebuilt from sidecars.
995        let storage2 = BlobStorage::new_filesystem(tmp.clone())
996            .await
997            .expect("re-open should succeed");
998
999        let (data, meta) = storage2
1000            .download(&blob_id)
1001            .await
1002            .expect("download after re-open should succeed");
1003
1004        assert_eq!(data.as_slice(), payload);
1005        assert_eq!(meta.content_type, "text/plain");
1006        assert_eq!(meta.account_id, "account-bob");
1007
1008        // Cleanup
1009        let _ = tokio::fs::remove_dir_all(&tmp).await;
1010    }
1011
1012    /// After a restart, the index count must match the number of blobs written.
1013    #[tokio::test]
1014    async fn test_filesystem_index_rebuild() {
1015        let tmp = std::env::temp_dir().join(format!("rusmes_blob_index_{}", Uuid::new_v4()));
1016
1017        const N: usize = 5;
1018
1019        {
1020            let storage = BlobStorage::new_filesystem(tmp.clone())
1021                .await
1022                .expect("new_filesystem should succeed");
1023
1024            for i in 0..N {
1025                let payload = format!("blob payload {}", i);
1026                storage
1027                    .upload("account-test", "text/plain", payload.as_bytes())
1028                    .await
1029                    .expect("upload should succeed");
1030            }
1031
1032            let count = storage.blob_count().await.expect("count should succeed");
1033            assert_eq!(count, N);
1034        } // storage dropped
1035
1036        // Re-open; index must match.
1037        let storage2 = BlobStorage::new_filesystem(tmp.clone())
1038            .await
1039            .expect("re-open should succeed");
1040
1041        let count = storage2.blob_count().await.expect("count should succeed");
1042        assert_eq!(count, N, "Index must be fully rebuilt after restart");
1043
1044        // Cleanup
1045        let _ = tokio::fs::remove_dir_all(&tmp).await;
1046    }
1047
1048    /// Filesystem backend also enforces size limits (before any I/O).
1049    #[tokio::test]
1050    async fn test_filesystem_size_limit_rejected() {
1051        let tmp = std::env::temp_dir().join(format!("rusmes_blob_sizelimit_{}", Uuid::new_v4()));
1052
1053        let storage = BlobStorage::new_filesystem(tmp.clone())
1054            .await
1055            .expect("new_filesystem should succeed")
1056            .with_max_blob_size(512);
1057
1058        let payload = vec![0u8; 513];
1059        let err = storage
1060            .upload("account-test", "application/octet-stream", &payload)
1061            .await
1062            .expect_err("over-limit upload should fail");
1063
1064        assert!(matches!(err, UploadError::TooLarge { .. }));
1065
1066        // Cleanup
1067        let _ = tokio::fs::remove_dir_all(&tmp).await;
1068    }
1069
1070    /// Delete removes both files from the filesystem.
1071    #[tokio::test]
1072    async fn test_filesystem_delete() {
1073        let tmp = std::env::temp_dir().join(format!("rusmes_blob_delete_{}", Uuid::new_v4()));
1074
1075        let storage = BlobStorage::new_filesystem(tmp.clone())
1076            .await
1077            .expect("new_filesystem should succeed");
1078
1079        let blob_id = storage
1080            .upload("account-test", "text/plain", b"to be deleted")
1081            .await
1082            .expect("upload should succeed");
1083
1084        storage
1085            .delete(&blob_id)
1086            .await
1087            .expect("delete should succeed");
1088
1089        // Blob should be gone from index.
1090        let err = storage
1091            .download(&blob_id)
1092            .await
1093            .expect_err("download after delete should fail");
1094        assert!(matches!(err, UploadError::NotFound(_)));
1095
1096        // Data file should be gone from disk.
1097        let blob_path = tmp.join("blobs").join(&blob_id);
1098        assert!(!blob_path.exists(), "blob file should have been removed");
1099
1100        // Cleanup
1101        let _ = tokio::fs::remove_dir_all(&tmp).await;
1102    }
1103}