Skip to main content

fileloft_core/handler/
mod.rs

1mod delete;
2mod get;
3mod head;
4mod options;
5mod patch;
6mod post;
7
8use std::sync::Arc;
9
10use bytes::Bytes;
11use http::{HeaderMap, HeaderValue, Method, StatusCode, Uri};
12use tokio::sync::broadcast;
13
14use crate::{
15    config::Config,
16    error::TusError,
17    hooks::{HookEvent, HookSender},
18    lock::SendLocker,
19    proto::{
20        HDR_ACCESS_CONTROL_ALLOW_CREDENTIALS, HDR_CACHE_CONTROL, HDR_TUS_RESUMABLE, TUS_VERSION,
21    },
22    store::SendDataStore,
23    util::static_header,
24};
25
26/// Response body: small protocol messages or a streamed download.
27pub enum TusBody {
28    Bytes(Bytes),
29    /// Streamed body (e.g. GET download). Framework layers map this to a streaming HTTP body.
30    Reader(Box<dyn tokio::io::AsyncRead + Send + Unpin>),
31}
32
33impl std::fmt::Debug for TusBody {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        match self {
36            Self::Bytes(b) => f.debug_tuple("Bytes").field(b).finish(),
37            Self::Reader(_) => f.write_str("Reader(..)"),
38        }
39    }
40}
41
42/// Incoming request as seen by the tus handler.
43/// Framework integrations construct this from their native request type.
44pub struct TusRequest {
45    pub method: Method,
46    pub uri: Uri,
47    /// Upload ID extracted from the URL path by the framework router.
48    /// Present for HEAD / PATCH / DELETE / GET; absent for OPTIONS and POST.
49    pub upload_id: Option<String>,
50    pub headers: HeaderMap,
51    /// Streaming body. `None` for HEAD / DELETE / OPTIONS / GET.
52    pub body: Option<Box<dyn tokio::io::AsyncRead + Send + Unpin>>,
53}
54
55/// Outgoing response produced by the tus handler.
56/// Framework integrations convert this into their native response type.
57pub struct TusResponse {
58    pub status: StatusCode,
59    pub headers: HeaderMap,
60    pub body: TusBody,
61}
62
63impl TusResponse {
64    /// When the body is [`TusBody::Bytes`], returns its slice (for tests and small responses).
65    pub fn bytes_slice(&self) -> Option<&[u8]> {
66        match &self.body {
67            TusBody::Bytes(b) => Some(b.as_ref()),
68            TusBody::Reader(_) => None,
69        }
70    }
71}
72
73/// The central tus protocol handler.
74///
75/// Wrap in `Arc<TusHandler<S, L>>` and share across request-handling tasks.
76///
77/// # Type Parameters
78/// - `S`: Storage backend implementing [`SendDataStore`].
79/// - `L`: Optional locker implementing [`SendLocker`]. Use `NoLocker` if concurrency
80///   control is handled by the store itself or is not needed.
81pub struct TusHandler<S, L = NoLocker> {
82    pub(crate) store: S,
83    pub(crate) locker: Option<L>,
84    pub(crate) config: Arc<Config>,
85    pub(crate) hook_tx: Option<HookSender>,
86}
87
88impl<S, L> TusHandler<S, L>
89where
90    S: SendDataStore + Send + Sync + 'static,
91    L: SendLocker + Send + Sync + 'static,
92{
93    pub fn new(store: S, locker: Option<L>, config: Config) -> Self {
94        let hook_tx = if config.hooks.channel_capacity > 0 {
95            let (tx, _) = broadcast::channel(config.hooks.channel_capacity);
96            Some(tx)
97        } else {
98            None
99        };
100        Self {
101            store,
102            locker,
103            config: Arc::new(config),
104            hook_tx,
105        }
106    }
107
108    /// Subscribe to lifecycle events. Returns `None` if hooks are not configured.
109    pub fn hook_receiver(&self) -> Option<broadcast::Receiver<HookEvent>> {
110        self.hook_tx.as_ref().map(|tx| tx.subscribe())
111    }
112
113    /// Maximum declared upload size in bytes. `0` means no core-level limit.
114    pub fn max_size(&self) -> u64 {
115        self.config.max_size
116    }
117
118    /// Main dispatch — routes to the appropriate sub-handler.
119    pub async fn handle(&self, req: TusRequest) -> TusResponse {
120        let result = match req.method {
121            Method::OPTIONS => options::handle(self, req).await,
122            Method::HEAD => head::handle(self, req).await,
123            Method::GET => get::handle(self, req).await,
124            Method::POST => post::handle(self, req).await,
125            Method::PATCH => patch::handle(self, req).await,
126            Method::DELETE => delete::handle(self, req).await,
127            _ => Err(TusError::MethodNotAllowed),
128        };
129        match result {
130            Ok(resp) => resp,
131            Err(err) => self.error_response(err),
132        }
133    }
134
135    /// Build a response with common tus headers added.
136    pub(crate) fn response(
137        &self,
138        status: StatusCode,
139        extra_headers: HeaderMap,
140        body: Bytes,
141    ) -> TusResponse {
142        self.response_with_body(status, extra_headers, TusBody::Bytes(body))
143    }
144
145    pub(crate) fn response_with_body(
146        &self,
147        status: StatusCode,
148        extra_headers: HeaderMap,
149        body: TusBody,
150    ) -> TusResponse {
151        let mut headers = self.base_headers();
152        headers.extend(extra_headers);
153        TusResponse {
154            status,
155            headers,
156            body,
157        }
158    }
159
160    /// Build an error response from a `TusError`.
161    pub(crate) fn error_response(&self, err: TusError) -> TusResponse {
162        let status = err.status_code();
163        if status.is_server_error() {
164            tracing::error!(error = %err, "internal tus handler error");
165        }
166        let body = Bytes::from(err.client_message());
167        let mut headers = self.base_headers();
168        headers.insert(
169            http::header::CONTENT_TYPE,
170            HeaderValue::from_static("text/plain; charset=utf-8"),
171        );
172        TusResponse {
173            status,
174            headers,
175            body: TusBody::Bytes(body),
176        }
177    }
178
179    /// Headers added to every response.
180    fn base_headers(&self) -> HeaderMap {
181        let mut h = HeaderMap::new();
182        h.insert(HDR_TUS_RESUMABLE, static_header(TUS_VERSION));
183        h.insert(HDR_CACHE_CONTROL, static_header("no-store"));
184        let cors = &self.config.cors;
185        if cors.enabled {
186            if let Ok(v) = HeaderValue::from_str(&cors.allow_origin) {
187                h.insert(crate::proto::HDR_ACCESS_CONTROL_ALLOW_ORIGIN, v);
188            }
189            if cors.allow_credentials {
190                h.insert(HDR_ACCESS_CONTROL_ALLOW_CREDENTIALS, static_header("true"));
191            }
192            let mut expose = String::from(
193                "Upload-Offset,Upload-Length,Upload-Metadata,Upload-Expires,\
194                 Upload-Defer-Length,Location,Tus-Resumable,Tus-Version,Tus-Extension,\
195                 Tus-Max-Size,Tus-Checksum-Algorithm,Content-Length,Content-Type",
196            );
197            for extra in &cors.extra_expose_headers {
198                expose.push(',');
199                expose.push_str(extra.trim());
200            }
201            if let Ok(v) = expose.parse() {
202                h.insert(crate::proto::HDR_ACCESS_CONTROL_EXPOSE_HEADERS, v);
203            }
204        }
205        h
206    }
207
208    /// Emit a hook event (non-blocking; missed if no subscriber).
209    pub(crate) fn emit(&self, event: HookEvent) {
210        if let Some(tx) = &self.hook_tx {
211            let _ = tx.send(event);
212        }
213    }
214}
215
216/// A no-op locker used when the caller passes `None` for the locker type.
217pub struct NoLocker;
218
219impl crate::lock::SendLock for NoLocker {
220    async fn release(self) -> Result<(), TusError> {
221        Ok(())
222    }
223}
224
225impl crate::lock::SendLocker for NoLocker {
226    type LockType = NoLocker;
227    async fn acquire(&self, _id: &crate::info::UploadId) -> Result<NoLocker, TusError> {
228        Ok(NoLocker)
229    }
230}