1use std::sync::Arc;
80
81use tokio::sync::watch;
82
83use crate::error::TusError;
84use crate::handlers::{GenerateUrlCtx, Metadata};
85use crate::lockers::Locker;
86use crate::options::{MaxSize, TusOptions, UploadFinishPatch, UploadPatch};
87use crate::stores::{DataStore, DiskStore, UploadInfo};
88use crate::utils::normalize_path;
89
90mod error;
91mod handlers;
92mod lockers;
93mod stores;
94
95pub mod options;
96pub mod utils;
97
98use salvo_core::{Depot, Request, Router, handler};
99
100pub const TUS_VERSION: &str = "1.0.0";
101pub const H_TUS_RESUMABLE: &str = "tus-resumable";
102pub const H_TUS_VERSION: &str = "tus-version";
103pub const H_TUS_EXTENSION: &str = "tus-extension";
104pub const H_TUS_MAX_SIZE: &str = "tus-max-size";
105
106pub const H_ACCESS_CONTROL_ALLOW_METHODS: &str = "access-control-allow-methods";
107pub const H_ACCESS_CONTROL_ALLOW_HEADERS: &str = "access-control-allow-headers";
108pub const H_ACCESS_CONTROL_REQUEST_HEADERS: &str = "access-control-request-headers";
109pub const H_ACCESS_CONTROL_MAX_AGE: &str = "access-control-max-age";
110
111pub const H_UPLOAD_LENGTH: &str = "upload-length";
112pub const H_UPLOAD_OFFSET: &str = "upload-offset";
113pub const H_UPLOAD_METADATA: &str = "upload-metadata";
114pub const H_UPLOAD_CONCAT: &str = "upload-concat";
115pub const H_UPLOAD_DEFER_LENGTH: &str = "upload-defer-length";
116pub const H_UPLOAD_EXPIRES: &str = "upload-expires";
117
118pub const H_CONTENT_TYPE: &str = "content-type";
119pub const H_CONTENT_LENGTH: &str = "content-length";
120pub const CT_OFFSET_OCTET_STREAM: &str = "application/offset+octet-stream";
121
122#[derive(Clone, Copy, Debug, Eq, PartialEq)]
123pub enum CancellationReason {
124 Abort,
125 Cancel,
126}
127
128#[derive(Clone, Debug)]
129pub struct CancellationSignal {
130 receiver: watch::Receiver<Option<CancellationReason>>,
131}
132
133impl CancellationSignal {
134 pub fn reason(&self) -> Option<CancellationReason> {
135 *self.receiver.borrow()
136 }
137
138 pub fn is_cancelled(&self) -> bool {
139 self.reason().is_some()
140 }
141
142 pub fn is_aborted(&self) -> bool {
143 matches!(self.reason(), Some(CancellationReason::Abort))
144 }
145
146 pub async fn cancelled(&mut self) -> CancellationReason {
147 loop {
148 if let Some(reason) = *self.receiver.borrow() {
149 return reason;
150 }
151 if self.receiver.changed().await.is_err() {
152 return CancellationReason::Cancel;
153 }
154 }
155 }
156}
157
158#[derive(Clone, Debug)]
159pub struct CancellationContext {
160 pub signal: CancellationSignal,
161 sender: watch::Sender<Option<CancellationReason>>,
162}
163
164impl CancellationContext {
165 pub fn new() -> Self {
166 let (sender, receiver) = watch::channel(None);
167 Self {
168 signal: CancellationSignal { receiver },
169 sender,
170 }
171 }
172
173 pub fn abort(&self) {
174 let _ = self.sender.send(Some(CancellationReason::Abort));
175 }
176
177 pub fn cancel(&self) {
178 let _ = self.sender.send(Some(CancellationReason::Cancel));
179 }
180}
181
182impl Default for CancellationContext {
183 fn default() -> Self {
184 Self::new()
185 }
186}
187
188#[derive(Clone)]
189struct TusStateHoop {
190 state: Arc<Tus>,
191}
192
193#[handler]
194impl TusStateHoop {
195 async fn handle(&self, depot: &mut Depot) {
196 depot.inject(self.state.clone());
197 }
198}
199
200#[derive(Clone)]
201pub struct Tus {
202 options: TusOptions,
203 store: Arc<dyn DataStore>,
204}
205impl Default for Tus {
206 fn default() -> Self {
207 Self::new()
208 }
209}
210
211impl Tus {
213 pub fn new() -> Self {
214 Self {
215 options: TusOptions::default(),
216 store: Arc::new(DiskStore::new()),
217 }
218 }
219
220 pub fn path(mut self, path: impl Into<String>) -> Self {
221 self.options.path = path.into();
222 self
223 }
224
225 pub fn max_size(mut self, max_size: MaxSize) -> Self {
226 self.options.max_size = Some(max_size);
227 self
228 }
229
230 pub fn relative_location(mut self, yes: bool) -> Self {
231 self.options.relative_location = yes;
232 self
233 }
234
235 pub fn with_store(mut self, store: impl DataStore) -> Self {
236 self.store = Arc::new(store);
237 self
238 }
239
240 pub fn with_locker(mut self, locker: impl Locker) -> Self {
241 self.options.locker = Arc::new(locker);
242 self
243 }
244
245 pub fn into_router(self) -> Router {
246 let base_path = normalize_path(&self.options.path);
247 let state = Arc::new(self);
248
249 Router::with_path(base_path)
250 .hoop(TusStateHoop {
251 state: state.clone(),
252 })
253 .push(handlers::options_handler())
254 .push(handlers::post_handler())
255 .push(handlers::head_handler())
256 .push(handlers::patch_handler())
257 .push(handlers::delete_handler())
258 .push(handlers::get_handler())
259 }
260}
261
262impl Tus {
264 pub fn with_upload_id_naming_function<F, Fut>(mut self, f: F) -> Self
265 where
266 F: Fn(&Request, Option<Metadata>) -> Fut + Send + Sync + 'static,
267 Fut: Future<Output = Result<String, TusError>> + Send + 'static,
268 {
269 self.options.upload_id_naming_function = Arc::new(move |req, meta| Box::pin(f(req, meta)));
270 self
271 }
272
273 pub fn with_generate_url_function<F>(mut self, f: F) -> Self
274 where
275 F: Fn(&Request, GenerateUrlCtx) -> Result<String, TusError> + Send + Sync + 'static,
276 {
277 self.options.generate_url_function = Some(Arc::new(f));
278 self
279 }
280}
281
282impl Tus {
284 pub fn with_on_incoming_request<F, Fut>(mut self, f: F) -> Self
285 where
286 F: Fn(&Request, String) -> Fut + Send + Sync + 'static,
287 Fut: Future<Output = ()> + Send + 'static,
288 {
289 self.options.on_incoming_request = Some(Arc::new(move |req, id| Box::pin(f(req, id))));
290 self
291 }
292
293 pub fn with_on_incoming_request_sync<F>(mut self, f: F) -> Self
294 where
295 F: Fn(&Request, String) + Send + Sync + 'static,
296 {
297 self.options.on_incoming_request = Some(Arc::new(move |req, id| {
298 f(req, id);
299 Box::pin(async move {})
300 }));
301 self
302 }
303
304 pub fn with_on_upload_create<F, Fut>(mut self, f: F) -> Self
305 where
306 F: Fn(&Request, UploadInfo) -> Fut + Send + Sync + 'static,
307 Fut: Future<Output = Result<UploadPatch, TusError>> + Send + 'static,
308 {
309 self.options.on_upload_create = Some(Arc::new(move |req, upload| Box::pin(f(req, upload))));
310 self
311 }
312
313 pub fn with_on_upload_finish<F, Fut>(mut self, f: F) -> Self
314 where
315 F: Fn(&Request, UploadInfo) -> Fut + Send + Sync + 'static,
316 Fut: Future<Output = Result<UploadFinishPatch, TusError>> + Send + 'static,
317 {
318 self.options.on_upload_finish = Some(Arc::new(move |req, upload| Box::pin(f(req, upload))));
319 self
320 }
321}
322
323#[cfg(test)]
324mod tests {
325 use super::*;
326
327 #[test]
328 fn test_constants() {
329 assert_eq!(TUS_VERSION, "1.0.0");
330 assert_eq!(H_TUS_RESUMABLE, "tus-resumable");
331 assert_eq!(H_TUS_VERSION, "tus-version");
332 assert_eq!(H_TUS_EXTENSION, "tus-extension");
333 assert_eq!(H_TUS_MAX_SIZE, "tus-max-size");
334 assert_eq!(H_UPLOAD_LENGTH, "upload-length");
335 assert_eq!(H_UPLOAD_OFFSET, "upload-offset");
336 assert_eq!(H_UPLOAD_METADATA, "upload-metadata");
337 assert_eq!(H_UPLOAD_CONCAT, "upload-concat");
338 assert_eq!(H_UPLOAD_DEFER_LENGTH, "upload-defer-length");
339 assert_eq!(H_UPLOAD_EXPIRES, "upload-expires");
340 assert_eq!(H_CONTENT_TYPE, "content-type");
341 assert_eq!(H_CONTENT_LENGTH, "content-length");
342 assert_eq!(CT_OFFSET_OCTET_STREAM, "application/offset+octet-stream");
343 }
344
345 #[test]
346 fn test_cancellation_reason_equality() {
347 assert_eq!(CancellationReason::Abort, CancellationReason::Abort);
348 assert_eq!(CancellationReason::Cancel, CancellationReason::Cancel);
349 assert_ne!(CancellationReason::Abort, CancellationReason::Cancel);
350 }
351
352 #[test]
353 fn test_cancellation_reason_clone_copy() {
354 let reason = CancellationReason::Abort;
355 let cloned = reason.clone();
356 let copied = reason;
357 assert_eq!(reason, cloned);
358 assert_eq!(reason, copied);
359 }
360
361 #[test]
362 fn test_cancellation_reason_debug() {
363 let debug = format!("{:?}", CancellationReason::Abort);
364 assert_eq!(debug, "Abort");
365
366 let debug = format!("{:?}", CancellationReason::Cancel);
367 assert_eq!(debug, "Cancel");
368 }
369
370 #[test]
371 fn test_cancellation_context_new() {
372 let ctx = CancellationContext::new();
373 assert!(!ctx.signal.is_cancelled());
374 assert!(!ctx.signal.is_aborted());
375 assert!(ctx.signal.reason().is_none());
376 }
377
378 #[test]
379 fn test_cancellation_context_default() {
380 let ctx = CancellationContext::default();
381 assert!(!ctx.signal.is_cancelled());
382 }
383
384 #[test]
385 fn test_cancellation_context_abort() {
386 let ctx = CancellationContext::new();
387 ctx.abort();
388
389 assert!(ctx.signal.is_cancelled());
390 assert!(ctx.signal.is_aborted());
391 assert_eq!(ctx.signal.reason(), Some(CancellationReason::Abort));
392 }
393
394 #[test]
395 fn test_cancellation_context_cancel() {
396 let ctx = CancellationContext::new();
397 ctx.cancel();
398
399 assert!(ctx.signal.is_cancelled());
400 assert!(!ctx.signal.is_aborted());
401 assert_eq!(ctx.signal.reason(), Some(CancellationReason::Cancel));
402 }
403
404 #[test]
405 fn test_cancellation_signal_clone() {
406 let ctx = CancellationContext::new();
407 let signal1 = ctx.signal.clone();
408 let signal2 = ctx.signal.clone();
409
410 assert!(!signal1.is_cancelled());
411 assert!(!signal2.is_cancelled());
412
413 ctx.abort();
414
415 assert!(signal1.is_cancelled());
416 assert!(signal2.is_cancelled());
417 }
418
419 #[test]
420 fn test_cancellation_context_clone() {
421 let ctx1 = CancellationContext::new();
422 let ctx2 = ctx1.clone();
423
424 ctx1.abort();
426
427 assert!(ctx2.signal.is_cancelled());
428 }
429
430 #[tokio::test]
431 async fn test_cancellation_signal_cancelled_async() {
432 let ctx = CancellationContext::new();
433 let mut signal = ctx.signal.clone();
434
435 let ctx_clone = ctx.clone();
437 tokio::spawn(async move {
438 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
439 ctx_clone.cancel();
440 });
441
442 let reason = signal.cancelled().await;
443 assert_eq!(reason, CancellationReason::Cancel);
444 }
445
446 #[tokio::test]
447 async fn test_cancellation_signal_cancelled_already_cancelled() {
448 let ctx = CancellationContext::new();
449 ctx.abort();
450
451 let mut signal = ctx.signal.clone();
452 let reason = signal.cancelled().await;
453 assert_eq!(reason, CancellationReason::Abort);
454 }
455
456 #[test]
457 fn test_cancellation_context_debug() {
458 let ctx = CancellationContext::new();
459 let debug = format!("{:?}", ctx);
460 assert!(debug.contains("CancellationContext"));
461 }
462
463 #[test]
464 fn test_cancellation_signal_debug() {
465 let ctx = CancellationContext::new();
466 let debug = format!("{:?}", ctx.signal);
467 assert!(debug.contains("CancellationSignal"));
468 }
469
470 #[test]
471 fn test_tus_new() {
472 let tus = Tus::new();
473 assert_eq!(tus.options.path, "/tus-files");
474 }
475
476 #[test]
477 fn test_tus_default() {
478 let tus = Tus::default();
479 assert_eq!(tus.options.path, "/tus-files");
480 }
481
482 #[test]
483 fn test_tus_path() {
484 let tus = Tus::new().path("/custom/uploads");
485 assert_eq!(tus.options.path, "/custom/uploads");
486 }
487
488 #[test]
489 fn test_tus_max_size() {
490 let tus = Tus::new().max_size(MaxSize::Fixed(1024 * 1024));
491 match &tus.options.max_size {
492 Some(MaxSize::Fixed(size)) => assert_eq!(*size, 1024 * 1024),
493 _ => panic!("Expected Fixed max_size"),
494 }
495 }
496
497 #[test]
498 fn test_tus_relative_location() {
499 let tus = Tus::new().relative_location(false);
500 assert!(!tus.options.relative_location);
501
502 let tus = Tus::new().relative_location(true);
503 assert!(tus.options.relative_location);
504 }
505
506 #[test]
507 fn test_tus_with_locker() {
508 use lockers::memory_locker::MemoryLocker;
509
510 let tus = Tus::new().with_locker(MemoryLocker::new());
511 assert!(Arc::strong_count(&tus.options.locker) >= 1);
513 }
514
515 #[test]
516 fn test_tus_with_store() {
517 let tus = Tus::new().with_store(stores::DiskStore::new());
518 assert!(Arc::strong_count(&tus.store) >= 1);
520 }
521
522 #[test]
523 fn test_tus_into_router() {
524 let tus = Tus::new().path("/uploads");
525 let _router = tus.into_router();
526 }
528
529 #[test]
530 fn test_tus_clone() {
531 let tus = Tus::new().path("/test");
532 let cloned = tus.clone();
533 assert_eq!(cloned.options.path, "/test");
534 }
535
536 #[test]
537 fn test_tus_builder_chain() {
538 let tus = Tus::new()
539 .path("/api/tus")
540 .max_size(MaxSize::Fixed(10 * 1024 * 1024))
541 .relative_location(false);
542
543 assert_eq!(tus.options.path, "/api/tus");
544 assert!(!tus.options.relative_location);
545 match &tus.options.max_size {
546 Some(MaxSize::Fixed(size)) => assert_eq!(*size, 10 * 1024 * 1024),
547 _ => panic!("Expected Fixed max_size"),
548 }
549 }
550
551 #[tokio::test]
552 async fn test_tus_with_upload_id_naming_function() {
553 let tus = Tus::new().with_upload_id_naming_function(|_req, _meta| async move {
554 Ok("custom-id".to_string())
555 });
556
557 let req = Request::default();
559 let result = (tus.options.upload_id_naming_function)(&req, None).await;
560 assert_eq!(result.unwrap(), "custom-id");
561 }
562
563 #[test]
564 fn test_tus_with_generate_url_function() {
565 let tus = Tus::new().with_generate_url_function(|_req, ctx| {
566 Ok(format!("https://cdn.example.com/{}", ctx.id))
567 });
568
569 assert!(tus.options.generate_url_function.is_some());
570 }
571
572 #[tokio::test]
573 async fn test_tus_with_on_incoming_request() {
574 use std::sync::Arc;
575 use std::sync::atomic::{AtomicBool, Ordering};
576
577 let called = Arc::new(AtomicBool::new(false));
578 let called_clone = called.clone();
579
580 let tus = Tus::new().with_on_incoming_request(move |_req, _id| {
581 let called = called_clone.clone();
582 async move {
583 called.store(true, Ordering::SeqCst);
584 }
585 });
586
587 assert!(tus.options.on_incoming_request.is_some());
588 let req = Request::default();
589 (tus.options.on_incoming_request.unwrap())(&req, "test-id".to_string()).await;
590 assert!(called.load(Ordering::SeqCst));
591 }
592
593 #[test]
594 fn test_tus_with_on_incoming_request_sync() {
595 use std::sync::Arc;
596 use std::sync::atomic::{AtomicBool, Ordering};
597
598 let called = Arc::new(AtomicBool::new(false));
599 let called_clone = called.clone();
600
601 let tus = Tus::new().with_on_incoming_request_sync(move |_req, _id| {
602 called_clone.store(true, Ordering::SeqCst);
603 });
604
605 assert!(tus.options.on_incoming_request.is_some());
606 }
607
608 #[tokio::test]
609 async fn test_tus_with_on_upload_create() {
610 let tus = Tus::new()
611 .with_on_upload_create(|_req, _upload| async move { Ok(UploadPatch::default()) });
612
613 assert!(tus.options.on_upload_create.is_some());
614 }
615
616 #[tokio::test]
617 async fn test_tus_with_on_upload_finish() {
618 let tus = Tus::new()
619 .with_on_upload_finish(|_req, _upload| async move { Ok(UploadFinishPatch::default()) });
620
621 assert!(tus.options.on_upload_finish.is_some());
622 }
623}