Skip to main content

fileloft_core/handler/
mod.rs

1mod delete;
2mod get;
3mod head;
4mod options;
5mod patch;
6mod post;
7
8use std::sync::Arc;
9
10use bytes::Bytes;
11use http::{HeaderMap, HeaderValue, Method, StatusCode, Uri};
12use tokio::sync::broadcast;
13
14use crate::{
15    config::Config,
16    error::TusError,
17    hooks::{HookEvent, HookSender},
18    lock::SendLocker,
19    proto::{
20        HDR_ACCESS_CONTROL_ALLOW_CREDENTIALS, HDR_CACHE_CONTROL, HDR_TUS_RESUMABLE, TUS_VERSION,
21    },
22    store::SendDataStore,
23    util::static_header,
24};
25
26/// Response body: small protocol messages or a streamed download.
27pub enum TusBody {
28    Bytes(Bytes),
29    /// Streamed body (e.g. GET download). Framework layers map this to a streaming HTTP body.
30    Reader(Box<dyn tokio::io::AsyncRead + Send + Unpin>),
31}
32
33impl std::fmt::Debug for TusBody {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        match self {
36            Self::Bytes(b) => f.debug_tuple("Bytes").field(b).finish(),
37            Self::Reader(_) => f.write_str("Reader(..)"),
38        }
39    }
40}
41
42/// Incoming request as seen by the tus handler.
43/// Framework integrations construct this from their native request type.
44pub struct TusRequest {
45    pub method: Method,
46    pub uri: Uri,
47    /// Upload ID extracted from the URL path by the framework router.
48    /// Present for HEAD / PATCH / DELETE / GET; absent for OPTIONS and POST.
49    pub upload_id: Option<String>,
50    pub headers: HeaderMap,
51    /// Streaming body. `None` for HEAD / DELETE / OPTIONS / GET.
52    pub body: Option<Box<dyn tokio::io::AsyncRead + Send + Sync + Unpin>>,
53}
54
55/// Outgoing response produced by the tus handler.
56/// Framework integrations convert this into their native response type.
57pub struct TusResponse {
58    pub status: StatusCode,
59    pub headers: HeaderMap,
60    pub body: TusBody,
61}
62
63impl TusResponse {
64    /// When the body is [`TusBody::Bytes`], returns its slice (for tests and small responses).
65    pub fn bytes_slice(&self) -> Option<&[u8]> {
66        match &self.body {
67            TusBody::Bytes(b) => Some(b.as_ref()),
68            TusBody::Reader(_) => None,
69        }
70    }
71}
72
73/// The central tus protocol handler.
74///
75/// Wrap in `Arc<TusHandler<S, L>>` and share across request-handling tasks.
76///
77/// # Type Parameters
78/// - `S`: Storage backend implementing [`SendDataStore`].
79/// - `L`: Optional locker implementing [`SendLocker`]. Use `NoLocker` if concurrency
80///   control is handled by the store itself or is not needed.
81pub struct TusHandler<S, L = NoLocker> {
82    pub(crate) store: S,
83    pub(crate) locker: Option<L>,
84    pub(crate) config: Arc<Config>,
85    pub(crate) hook_tx: Option<HookSender>,
86}
87
88impl<S, L> TusHandler<S, L>
89where
90    S: SendDataStore + Send + Sync + 'static,
91    L: SendLocker + Send + Sync + 'static,
92{
93    pub fn new(store: S, locker: Option<L>, config: Config) -> Self {
94        let hook_tx = if config.hooks.channel_capacity > 0 {
95            let (tx, _) = broadcast::channel(config.hooks.channel_capacity);
96            Some(tx)
97        } else {
98            None
99        };
100        Self {
101            store,
102            locker,
103            config: Arc::new(config),
104            hook_tx,
105        }
106    }
107
108    /// Subscribe to lifecycle events. Returns `None` if hooks are not configured.
109    pub fn hook_receiver(&self) -> Option<broadcast::Receiver<HookEvent>> {
110        self.hook_tx.as_ref().map(|tx| tx.subscribe())
111    }
112
113    /// Main dispatch — routes to the appropriate sub-handler.
114    pub async fn handle(&self, req: TusRequest) -> TusResponse {
115        let result = match req.method {
116            Method::OPTIONS => options::handle(self, &req).await,
117            Method::HEAD => head::handle(self, &req).await,
118            Method::GET => get::handle(self, &req).await,
119            Method::POST => post::handle(self, req).await,
120            Method::PATCH => patch::handle(self, req).await,
121            Method::DELETE => delete::handle(self, &req).await,
122            _ => Err(TusError::MethodNotAllowed),
123        };
124        match result {
125            Ok(resp) => resp,
126            Err(err) => self.error_response(err),
127        }
128    }
129
130    /// Build a response with common tus headers added.
131    pub(crate) fn response(
132        &self,
133        status: StatusCode,
134        extra_headers: HeaderMap,
135        body: Bytes,
136    ) -> TusResponse {
137        self.response_with_body(status, extra_headers, TusBody::Bytes(body))
138    }
139
140    pub(crate) fn response_with_body(
141        &self,
142        status: StatusCode,
143        extra_headers: HeaderMap,
144        body: TusBody,
145    ) -> TusResponse {
146        let mut headers = self.base_headers();
147        headers.extend(extra_headers);
148        TusResponse {
149            status,
150            headers,
151            body,
152        }
153    }
154
155    /// Build an error response from a `TusError`.
156    pub(crate) fn error_response(&self, err: TusError) -> TusResponse {
157        let status = err.status_code();
158        let body = Bytes::from(err.to_string());
159        let mut headers = self.base_headers();
160        headers.insert(
161            http::header::CONTENT_TYPE,
162            HeaderValue::from_static("text/plain; charset=utf-8"),
163        );
164        TusResponse {
165            status,
166            headers,
167            body: TusBody::Bytes(body),
168        }
169    }
170
171    /// Headers added to every response.
172    fn base_headers(&self) -> HeaderMap {
173        let mut h = HeaderMap::new();
174        h.insert(HDR_TUS_RESUMABLE, static_header(TUS_VERSION));
175        h.insert(HDR_CACHE_CONTROL, static_header("no-store"));
176        let cors = &self.config.cors;
177        if cors.enabled {
178            if let Ok(v) = HeaderValue::from_str(&cors.allow_origin) {
179                h.insert(crate::proto::HDR_ACCESS_CONTROL_ALLOW_ORIGIN, v);
180            }
181            if cors.allow_credentials {
182                h.insert(HDR_ACCESS_CONTROL_ALLOW_CREDENTIALS, static_header("true"));
183            }
184            let mut expose = String::from(
185                "Upload-Offset,Upload-Length,Upload-Metadata,Upload-Expires,\
186                 Upload-Defer-Length,Location,Tus-Resumable,Tus-Version,Tus-Extension,\
187                 Tus-Max-Size,Tus-Checksum-Algorithm,Content-Length,Content-Type",
188            );
189            for extra in &cors.extra_expose_headers {
190                expose.push(',');
191                expose.push_str(extra.trim());
192            }
193            if let Ok(v) = expose.parse() {
194                h.insert(crate::proto::HDR_ACCESS_CONTROL_EXPOSE_HEADERS, v);
195            }
196        }
197        h
198    }
199
200    /// Emit a hook event (non-blocking; missed if no subscriber).
201    pub(crate) fn emit(&self, event: HookEvent) {
202        if let Some(tx) = &self.hook_tx {
203            let _ = tx.send(event);
204        }
205    }
206}
207
208/// A no-op locker used when the caller passes `None` for the locker type.
209pub struct NoLocker;
210
211impl crate::lock::SendLock for NoLocker {
212    async fn release(self) -> Result<(), TusError> {
213        Ok(())
214    }
215}
216
217impl crate::lock::SendLocker for NoLocker {
218    type LockType = NoLocker;
219    async fn acquire(&self, _id: &crate::info::UploadId) -> Result<NoLocker, TusError> {
220        Ok(NoLocker)
221    }
222}