Skip to main content

salvo_tus/
lib.rs

1//! TUS (Resumable Upload Protocol) implementation for the Salvo web framework.
2//!
3//! [TUS](https://tus.io/) is an open protocol for resumable file uploads over HTTP.
4//! It allows reliable uploads of large files by enabling pause and resume functionality,
5//! making it ideal for unreliable network conditions.
6//!
7//! # Features
8//!
9//! - Resumable uploads - Clients can resume interrupted uploads
10//! - Upload metadata - Attach custom metadata to uploads
11//! - Configurable max size - Limit upload file sizes
12//! - Lifecycle hooks - React to upload events
13//! - Customizable storage - Implement your own storage backend
14//!
15//! # Example
16//!
17//! ```ignore
18//! use salvo_tus::{Tus, MaxSize};
19//! use salvo_core::prelude::*;
20//!
21//! let tus = Tus::new()
22//!     .path("/uploads")
23//!     .max_size(MaxSize::Fixed(100 * 1024 * 1024));  // 100 MB limit
24//!
25//! let router = Router::new()
26//!     .push(tus.into_router());
27//!
28//! let acceptor = TcpListener::new("0.0.0.0:8080").bind().await;
29//! Server::new(acceptor).serve(router).await;
30//! ```
31//!
32//! # TUS Protocol Endpoints
33//!
34//! The router created by `into_router()` handles:
35//!
36//! | Method | Path | Description |
37//! |--------|------|-------------|
38//! | OPTIONS | `/uploads` | Returns TUS protocol capabilities |
39//! | POST | `/uploads` | Creates a new upload |
40//! | HEAD | `/uploads/{id}` | Returns upload progress |
41//! | PATCH | `/uploads/{id}` | Uploads a chunk |
42//! | DELETE | `/uploads/{id}` | Cancels an upload |
43//! | GET | `/uploads/{id}` | Downloads the uploaded file |
44//!
45//! # Lifecycle Hooks
46//!
47//! React to upload events:
48//!
49//! ```ignore
50//! let tus = Tus::new()
51//!     .with_on_upload_create(|req, upload_info| async move {
52//!         println!("New upload: {:?}", upload_info);
53//!         Ok(UploadPatch::default())
54//!     })
55//!     .with_on_upload_finish(|req, upload_info| async move {
56//!         println!("Upload complete: {:?}", upload_info);
57//!         Ok(UploadFinishPatch::default())
58//!     });
59//! ```
60//!
61//! # Custom Upload ID
62//!
63//! Generate custom upload IDs:
64//!
65//! ```ignore
66//! let tus = Tus::new()
67//!     .with_upload_id_naming_function(|req, metadata| async move {
68//!         Ok(uuid::Uuid::new_v4().to_string())
69//!     });
70//! ```
71//!
72//! # Storage Backends
73//!
74//! By default, files are stored on disk using `DiskStore`.
75//! Implement `DataStore` trait for custom storage (S3, database, etc.).
76//!
77//! Read more: <https://salvo.rs>
78
79use std::sync::Arc;
80
81use tokio::sync::watch;
82
83use crate::error::TusError;
84use crate::handlers::{GenerateUrlCtx, Metadata};
85use crate::lockers::Locker;
86use crate::options::{MaxSize, TusOptions, UploadFinishPatch, UploadPatch};
87use crate::stores::{DataStore, DiskStore, UploadInfo};
88use crate::utils::normalize_path;
89
90mod error;
91mod handlers;
92mod lockers;
93mod stores;
94
95pub mod options;
96pub mod utils;
97
98use salvo_core::{Depot, Request, Router, handler};
99
100pub const TUS_VERSION: &str = "1.0.0";
101pub const H_TUS_RESUMABLE: &str = "tus-resumable";
102pub const H_TUS_VERSION: &str = "tus-version";
103pub const H_TUS_EXTENSION: &str = "tus-extension";
104pub const H_TUS_MAX_SIZE: &str = "tus-max-size";
105
106pub const H_ACCESS_CONTROL_ALLOW_METHODS: &str = "access-control-allow-methods";
107pub const H_ACCESS_CONTROL_ALLOW_HEADERS: &str = "access-control-allow-headers";
108pub const H_ACCESS_CONTROL_REQUEST_HEADERS: &str = "access-control-request-headers";
109pub const H_ACCESS_CONTROL_MAX_AGE: &str = "access-control-max-age";
110
111pub const H_UPLOAD_LENGTH: &str = "upload-length";
112pub const H_UPLOAD_OFFSET: &str = "upload-offset";
113pub const H_UPLOAD_METADATA: &str = "upload-metadata";
114pub const H_UPLOAD_CONCAT: &str = "upload-concat";
115pub const H_UPLOAD_DEFER_LENGTH: &str = "upload-defer-length";
116pub const H_UPLOAD_EXPIRES: &str = "upload-expires";
117
118pub const H_CONTENT_TYPE: &str = "content-type";
119pub const H_CONTENT_LENGTH: &str = "content-length";
120pub const CT_OFFSET_OCTET_STREAM: &str = "application/offset+octet-stream";
121
122#[derive(Clone, Copy, Debug, Eq, PartialEq)]
123pub enum CancellationReason {
124    Abort,
125    Cancel,
126}
127
128#[derive(Clone, Debug)]
129pub struct CancellationSignal {
130    receiver: watch::Receiver<Option<CancellationReason>>,
131}
132
133impl CancellationSignal {
134    pub fn reason(&self) -> Option<CancellationReason> {
135        *self.receiver.borrow()
136    }
137
138    pub fn is_cancelled(&self) -> bool {
139        self.reason().is_some()
140    }
141
142    pub fn is_aborted(&self) -> bool {
143        matches!(self.reason(), Some(CancellationReason::Abort))
144    }
145
146    pub async fn cancelled(&mut self) -> CancellationReason {
147        loop {
148            if let Some(reason) = *self.receiver.borrow() {
149                return reason;
150            }
151            if self.receiver.changed().await.is_err() {
152                return CancellationReason::Cancel;
153            }
154        }
155    }
156}
157
158#[derive(Clone, Debug)]
159pub struct CancellationContext {
160    pub signal: CancellationSignal,
161    sender: watch::Sender<Option<CancellationReason>>,
162}
163
164impl CancellationContext {
165    pub fn new() -> Self {
166        let (sender, receiver) = watch::channel(None);
167        Self {
168            signal: CancellationSignal { receiver },
169            sender,
170        }
171    }
172
173    pub fn abort(&self) {
174        let _ = self.sender.send(Some(CancellationReason::Abort));
175    }
176
177    pub fn cancel(&self) {
178        let _ = self.sender.send(Some(CancellationReason::Cancel));
179    }
180}
181
182impl Default for CancellationContext {
183    fn default() -> Self {
184        Self::new()
185    }
186}
187
188#[derive(Clone)]
189struct TusStateHoop {
190    state: Arc<Tus>,
191}
192
193#[handler]
194impl TusStateHoop {
195    async fn handle(&self, depot: &mut Depot) {
196        depot.inject(self.state.clone());
197    }
198}
199
200#[derive(Clone)]
201pub struct Tus {
202    options: TusOptions,
203    store: Arc<dyn DataStore>,
204}
205impl Default for Tus {
206    fn default() -> Self {
207        Self::new()
208    }
209}
210
211// Tus service Configuration
212impl Tus {
213    pub fn new() -> Self {
214        Self {
215            options: TusOptions::default(),
216            store: Arc::new(DiskStore::new()),
217        }
218    }
219
220    pub fn path(mut self, path: impl Into<String>) -> Self {
221        self.options.path = path.into();
222        self
223    }
224
225    pub fn max_size(mut self, max_size: MaxSize) -> Self {
226        self.options.max_size = Some(max_size);
227        self
228    }
229
230    pub fn relative_location(mut self, yes: bool) -> Self {
231        self.options.relative_location = yes;
232        self
233    }
234
235    pub fn with_store(mut self, store: impl DataStore) -> Self {
236        self.store = Arc::new(store);
237        self
238    }
239
240    pub fn with_locker(mut self, locker: impl Locker) -> Self {
241        self.options.locker = Arc::new(locker);
242        self
243    }
244
245    pub fn into_router(self) -> Router {
246        let base_path = normalize_path(&self.options.path);
247        let state = Arc::new(self);
248
249        Router::with_path(base_path)
250            .hoop(TusStateHoop {
251                state: state.clone(),
252            })
253            .push(handlers::options_handler())
254            .push(handlers::post_handler())
255            .push(handlers::head_handler())
256            .push(handlers::patch_handler())
257            .push(handlers::delete_handler())
258            .push(handlers::get_handler())
259    }
260}
261
262// Hooks
263impl Tus {
264    pub fn with_upload_id_naming_function<F, Fut>(mut self, f: F) -> Self
265    where
266        F: Fn(&Request, Option<Metadata>) -> Fut + Send + Sync + 'static,
267        Fut: Future<Output = Result<String, TusError>> + Send + 'static,
268    {
269        self.options.upload_id_naming_function = Arc::new(move |req, meta| Box::pin(f(req, meta)));
270        self
271    }
272
273    pub fn with_generate_url_function<F>(mut self, f: F) -> Self
274    where
275        F: Fn(&Request, GenerateUrlCtx) -> Result<String, TusError> + Send + Sync + 'static,
276    {
277        self.options.generate_url_function = Some(Arc::new(f));
278        self
279    }
280}
281
282// Lifecycle
283impl Tus {
284    pub fn with_on_incoming_request<F, Fut>(mut self, f: F) -> Self
285    where
286        F: Fn(&Request, String) -> Fut + Send + Sync + 'static,
287        Fut: Future<Output = ()> + Send + 'static,
288    {
289        self.options.on_incoming_request = Some(Arc::new(move |req, id| Box::pin(f(req, id))));
290        self
291    }
292
293    pub fn with_on_incoming_request_sync<F>(mut self, f: F) -> Self
294    where
295        F: Fn(&Request, String) + Send + Sync + 'static,
296    {
297        self.options.on_incoming_request = Some(Arc::new(move |req, id| {
298            f(req, id);
299            Box::pin(async move {})
300        }));
301        self
302    }
303
304    pub fn with_on_upload_create<F, Fut>(mut self, f: F) -> Self
305    where
306        F: Fn(&Request, UploadInfo) -> Fut + Send + Sync + 'static,
307        Fut: Future<Output = Result<UploadPatch, TusError>> + Send + 'static,
308    {
309        self.options.on_upload_create = Some(Arc::new(move |req, upload| Box::pin(f(req, upload))));
310        self
311    }
312
313    pub fn with_on_upload_finish<F, Fut>(mut self, f: F) -> Self
314    where
315        F: Fn(&Request, UploadInfo) -> Fut + Send + Sync + 'static,
316        Fut: Future<Output = Result<UploadFinishPatch, TusError>> + Send + 'static,
317    {
318        self.options.on_upload_finish = Some(Arc::new(move |req, upload| Box::pin(f(req, upload))));
319        self
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    use super::*;
326
327    #[test]
328    fn test_constants() {
329        assert_eq!(TUS_VERSION, "1.0.0");
330        assert_eq!(H_TUS_RESUMABLE, "tus-resumable");
331        assert_eq!(H_TUS_VERSION, "tus-version");
332        assert_eq!(H_TUS_EXTENSION, "tus-extension");
333        assert_eq!(H_TUS_MAX_SIZE, "tus-max-size");
334        assert_eq!(H_UPLOAD_LENGTH, "upload-length");
335        assert_eq!(H_UPLOAD_OFFSET, "upload-offset");
336        assert_eq!(H_UPLOAD_METADATA, "upload-metadata");
337        assert_eq!(H_UPLOAD_CONCAT, "upload-concat");
338        assert_eq!(H_UPLOAD_DEFER_LENGTH, "upload-defer-length");
339        assert_eq!(H_UPLOAD_EXPIRES, "upload-expires");
340        assert_eq!(H_CONTENT_TYPE, "content-type");
341        assert_eq!(H_CONTENT_LENGTH, "content-length");
342        assert_eq!(CT_OFFSET_OCTET_STREAM, "application/offset+octet-stream");
343    }
344
345    #[test]
346    fn test_cancellation_reason_equality() {
347        assert_eq!(CancellationReason::Abort, CancellationReason::Abort);
348        assert_eq!(CancellationReason::Cancel, CancellationReason::Cancel);
349        assert_ne!(CancellationReason::Abort, CancellationReason::Cancel);
350    }
351
352    #[test]
353    fn test_cancellation_reason_clone_copy() {
354        let reason = CancellationReason::Abort;
355        let cloned = reason.clone();
356        let copied = reason;
357        assert_eq!(reason, cloned);
358        assert_eq!(reason, copied);
359    }
360
361    #[test]
362    fn test_cancellation_reason_debug() {
363        let debug = format!("{:?}", CancellationReason::Abort);
364        assert_eq!(debug, "Abort");
365
366        let debug = format!("{:?}", CancellationReason::Cancel);
367        assert_eq!(debug, "Cancel");
368    }
369
370    #[test]
371    fn test_cancellation_context_new() {
372        let ctx = CancellationContext::new();
373        assert!(!ctx.signal.is_cancelled());
374        assert!(!ctx.signal.is_aborted());
375        assert!(ctx.signal.reason().is_none());
376    }
377
378    #[test]
379    fn test_cancellation_context_default() {
380        let ctx = CancellationContext::default();
381        assert!(!ctx.signal.is_cancelled());
382    }
383
384    #[test]
385    fn test_cancellation_context_abort() {
386        let ctx = CancellationContext::new();
387        ctx.abort();
388
389        assert!(ctx.signal.is_cancelled());
390        assert!(ctx.signal.is_aborted());
391        assert_eq!(ctx.signal.reason(), Some(CancellationReason::Abort));
392    }
393
394    #[test]
395    fn test_cancellation_context_cancel() {
396        let ctx = CancellationContext::new();
397        ctx.cancel();
398
399        assert!(ctx.signal.is_cancelled());
400        assert!(!ctx.signal.is_aborted());
401        assert_eq!(ctx.signal.reason(), Some(CancellationReason::Cancel));
402    }
403
404    #[test]
405    fn test_cancellation_signal_clone() {
406        let ctx = CancellationContext::new();
407        let signal1 = ctx.signal.clone();
408        let signal2 = ctx.signal.clone();
409
410        assert!(!signal1.is_cancelled());
411        assert!(!signal2.is_cancelled());
412
413        ctx.abort();
414
415        assert!(signal1.is_cancelled());
416        assert!(signal2.is_cancelled());
417    }
418
419    #[test]
420    fn test_cancellation_context_clone() {
421        let ctx1 = CancellationContext::new();
422        let ctx2 = ctx1.clone();
423
424        // Both contexts share the same sender/receiver
425        ctx1.abort();
426
427        assert!(ctx2.signal.is_cancelled());
428    }
429
430    #[tokio::test]
431    async fn test_cancellation_signal_cancelled_async() {
432        let ctx = CancellationContext::new();
433        let mut signal = ctx.signal.clone();
434
435        // Spawn a task to cancel after a short delay
436        let ctx_clone = ctx.clone();
437        tokio::spawn(async move {
438            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
439            ctx_clone.cancel();
440        });
441
442        let reason = signal.cancelled().await;
443        assert_eq!(reason, CancellationReason::Cancel);
444    }
445
446    #[tokio::test]
447    async fn test_cancellation_signal_cancelled_already_cancelled() {
448        let ctx = CancellationContext::new();
449        ctx.abort();
450
451        let mut signal = ctx.signal.clone();
452        let reason = signal.cancelled().await;
453        assert_eq!(reason, CancellationReason::Abort);
454    }
455
456    #[test]
457    fn test_cancellation_context_debug() {
458        let ctx = CancellationContext::new();
459        let debug = format!("{:?}", ctx);
460        assert!(debug.contains("CancellationContext"));
461    }
462
463    #[test]
464    fn test_cancellation_signal_debug() {
465        let ctx = CancellationContext::new();
466        let debug = format!("{:?}", ctx.signal);
467        assert!(debug.contains("CancellationSignal"));
468    }
469
470    #[test]
471    fn test_tus_new() {
472        let tus = Tus::new();
473        assert_eq!(tus.options.path, "/tus-files");
474    }
475
476    #[test]
477    fn test_tus_default() {
478        let tus = Tus::default();
479        assert_eq!(tus.options.path, "/tus-files");
480    }
481
482    #[test]
483    fn test_tus_path() {
484        let tus = Tus::new().path("/custom/uploads");
485        assert_eq!(tus.options.path, "/custom/uploads");
486    }
487
488    #[test]
489    fn test_tus_max_size() {
490        let tus = Tus::new().max_size(MaxSize::Fixed(1024 * 1024));
491        match &tus.options.max_size {
492            Some(MaxSize::Fixed(size)) => assert_eq!(*size, 1024 * 1024),
493            _ => panic!("Expected Fixed max_size"),
494        }
495    }
496
497    #[test]
498    fn test_tus_relative_location() {
499        let tus = Tus::new().relative_location(false);
500        assert!(!tus.options.relative_location);
501
502        let tus = Tus::new().relative_location(true);
503        assert!(tus.options.relative_location);
504    }
505
506    #[test]
507    fn test_tus_with_locker() {
508        use lockers::memory_locker::MemoryLocker;
509
510        let tus = Tus::new().with_locker(MemoryLocker::new());
511        // Just verify it compiles and doesn't panic
512        assert!(Arc::strong_count(&tus.options.locker) >= 1);
513    }
514
515    #[test]
516    fn test_tus_with_store() {
517        let tus = Tus::new().with_store(stores::DiskStore::new());
518        // Just verify it compiles and doesn't panic
519        assert!(Arc::strong_count(&tus.store) >= 1);
520    }
521
522    #[test]
523    fn test_tus_into_router() {
524        let tus = Tus::new().path("/uploads");
525        let _router = tus.into_router();
526        // Router creation should succeed
527    }
528
529    #[test]
530    fn test_tus_clone() {
531        let tus = Tus::new().path("/test");
532        let cloned = tus.clone();
533        assert_eq!(cloned.options.path, "/test");
534    }
535
536    #[test]
537    fn test_tus_builder_chain() {
538        let tus = Tus::new()
539            .path("/api/tus")
540            .max_size(MaxSize::Fixed(10 * 1024 * 1024))
541            .relative_location(false);
542
543        assert_eq!(tus.options.path, "/api/tus");
544        assert!(!tus.options.relative_location);
545        match &tus.options.max_size {
546            Some(MaxSize::Fixed(size)) => assert_eq!(*size, 10 * 1024 * 1024),
547            _ => panic!("Expected Fixed max_size"),
548        }
549    }
550
551    #[tokio::test]
552    async fn test_tus_with_upload_id_naming_function() {
553        let tus = Tus::new().with_upload_id_naming_function(|_req, _meta| async move {
554            Ok("custom-id".to_string())
555        });
556
557        // Verify the function is set by calling it
558        let req = Request::default();
559        let result = (tus.options.upload_id_naming_function)(&req, None).await;
560        assert_eq!(result.unwrap(), "custom-id");
561    }
562
563    #[test]
564    fn test_tus_with_generate_url_function() {
565        let tus = Tus::new().with_generate_url_function(|_req, ctx| {
566            Ok(format!("https://cdn.example.com/{}", ctx.id))
567        });
568
569        assert!(tus.options.generate_url_function.is_some());
570    }
571
572    #[tokio::test]
573    async fn test_tus_with_on_incoming_request() {
574        use std::sync::Arc;
575        use std::sync::atomic::{AtomicBool, Ordering};
576
577        let called = Arc::new(AtomicBool::new(false));
578        let called_clone = called.clone();
579
580        let tus = Tus::new().with_on_incoming_request(move |_req, _id| {
581            let called = called_clone.clone();
582            async move {
583                called.store(true, Ordering::SeqCst);
584            }
585        });
586
587        assert!(tus.options.on_incoming_request.is_some());
588        let req = Request::default();
589        (tus.options.on_incoming_request.unwrap())(&req, "test-id".to_string()).await;
590        assert!(called.load(Ordering::SeqCst));
591    }
592
593    #[test]
594    fn test_tus_with_on_incoming_request_sync() {
595        use std::sync::Arc;
596        use std::sync::atomic::{AtomicBool, Ordering};
597
598        let called = Arc::new(AtomicBool::new(false));
599        let called_clone = called.clone();
600
601        let tus = Tus::new().with_on_incoming_request_sync(move |_req, _id| {
602            called_clone.store(true, Ordering::SeqCst);
603        });
604
605        assert!(tus.options.on_incoming_request.is_some());
606    }
607
608    #[tokio::test]
609    async fn test_tus_with_on_upload_create() {
610        let tus = Tus::new()
611            .with_on_upload_create(|_req, _upload| async move { Ok(UploadPatch::default()) });
612
613        assert!(tus.options.on_upload_create.is_some());
614    }
615
616    #[tokio::test]
617    async fn test_tus_with_on_upload_finish() {
618        let tus = Tus::new()
619            .with_on_upload_finish(|_req, _upload| async move { Ok(UploadFinishPatch::default()) });
620
621        assert!(tus.options.on_upload_finish.is_some());
622    }
623}