1mod delete;
2mod get;
3mod head;
4mod options;
5mod patch;
6mod post;
7
8use std::sync::Arc;
9
10use bytes::Bytes;
11use http::{HeaderMap, HeaderValue, Method, StatusCode, Uri};
12use tokio::sync::broadcast;
13
14use crate::{
15 config::Config,
16 error::TusError,
17 hooks::{HookEvent, HookSender},
18 lock::SendLocker,
19 proto::{
20 HDR_ACCESS_CONTROL_ALLOW_CREDENTIALS, HDR_CACHE_CONTROL, HDR_TUS_RESUMABLE, TUS_VERSION,
21 },
22 store::SendDataStore,
23 util::static_header,
24};
25
26pub enum TusBody {
28 Bytes(Bytes),
29 Reader(Box<dyn tokio::io::AsyncRead + Send + Unpin>),
31}
32
33impl std::fmt::Debug for TusBody {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 match self {
36 Self::Bytes(b) => f.debug_tuple("Bytes").field(b).finish(),
37 Self::Reader(_) => f.write_str("Reader(..)"),
38 }
39 }
40}
41
42pub struct TusRequest {
45 pub method: Method,
46 pub uri: Uri,
47 pub upload_id: Option<String>,
50 pub headers: HeaderMap,
51 pub body: Option<Box<dyn tokio::io::AsyncRead + Send + Sync + Unpin>>,
53}
54
55pub struct TusResponse {
58 pub status: StatusCode,
59 pub headers: HeaderMap,
60 pub body: TusBody,
61}
62
63impl TusResponse {
64 pub fn bytes_slice(&self) -> Option<&[u8]> {
66 match &self.body {
67 TusBody::Bytes(b) => Some(b.as_ref()),
68 TusBody::Reader(_) => None,
69 }
70 }
71}
72
73pub struct TusHandler<S, L = NoLocker> {
82 pub(crate) store: S,
83 pub(crate) locker: Option<L>,
84 pub(crate) config: Arc<Config>,
85 pub(crate) hook_tx: Option<HookSender>,
86}
87
88impl<S, L> TusHandler<S, L>
89where
90 S: SendDataStore + Send + Sync + 'static,
91 L: SendLocker + Send + Sync + 'static,
92{
93 pub fn new(store: S, locker: Option<L>, config: Config) -> Self {
94 let hook_tx = if config.hooks.channel_capacity > 0 {
95 let (tx, _) = broadcast::channel(config.hooks.channel_capacity);
96 Some(tx)
97 } else {
98 None
99 };
100 Self {
101 store,
102 locker,
103 config: Arc::new(config),
104 hook_tx,
105 }
106 }
107
108 pub fn hook_receiver(&self) -> Option<broadcast::Receiver<HookEvent>> {
110 self.hook_tx.as_ref().map(|tx| tx.subscribe())
111 }
112
113 pub async fn handle(&self, req: TusRequest) -> TusResponse {
115 let result = match req.method {
116 Method::OPTIONS => options::handle(self, &req).await,
117 Method::HEAD => head::handle(self, &req).await,
118 Method::GET => get::handle(self, &req).await,
119 Method::POST => post::handle(self, req).await,
120 Method::PATCH => patch::handle(self, req).await,
121 Method::DELETE => delete::handle(self, &req).await,
122 _ => Err(TusError::MethodNotAllowed),
123 };
124 match result {
125 Ok(resp) => resp,
126 Err(err) => self.error_response(err),
127 }
128 }
129
130 pub(crate) fn response(
132 &self,
133 status: StatusCode,
134 extra_headers: HeaderMap,
135 body: Bytes,
136 ) -> TusResponse {
137 self.response_with_body(status, extra_headers, TusBody::Bytes(body))
138 }
139
140 pub(crate) fn response_with_body(
141 &self,
142 status: StatusCode,
143 extra_headers: HeaderMap,
144 body: TusBody,
145 ) -> TusResponse {
146 let mut headers = self.base_headers();
147 headers.extend(extra_headers);
148 TusResponse {
149 status,
150 headers,
151 body,
152 }
153 }
154
155 pub(crate) fn error_response(&self, err: TusError) -> TusResponse {
157 let status = err.status_code();
158 let body = Bytes::from(err.to_string());
159 let mut headers = self.base_headers();
160 headers.insert(
161 http::header::CONTENT_TYPE,
162 HeaderValue::from_static("text/plain; charset=utf-8"),
163 );
164 TusResponse {
165 status,
166 headers,
167 body: TusBody::Bytes(body),
168 }
169 }
170
171 fn base_headers(&self) -> HeaderMap {
173 let mut h = HeaderMap::new();
174 h.insert(HDR_TUS_RESUMABLE, static_header(TUS_VERSION));
175 h.insert(HDR_CACHE_CONTROL, static_header("no-store"));
176 let cors = &self.config.cors;
177 if cors.enabled {
178 if let Ok(v) = HeaderValue::from_str(&cors.allow_origin) {
179 h.insert(crate::proto::HDR_ACCESS_CONTROL_ALLOW_ORIGIN, v);
180 }
181 if cors.allow_credentials {
182 h.insert(HDR_ACCESS_CONTROL_ALLOW_CREDENTIALS, static_header("true"));
183 }
184 let mut expose = String::from(
185 "Upload-Offset,Upload-Length,Upload-Metadata,Upload-Expires,\
186 Upload-Defer-Length,Location,Tus-Resumable,Tus-Version,Tus-Extension,\
187 Tus-Max-Size,Tus-Checksum-Algorithm,Content-Length,Content-Type",
188 );
189 for extra in &cors.extra_expose_headers {
190 expose.push(',');
191 expose.push_str(extra.trim());
192 }
193 if let Ok(v) = expose.parse() {
194 h.insert(crate::proto::HDR_ACCESS_CONTROL_EXPOSE_HEADERS, v);
195 }
196 }
197 h
198 }
199
200 pub(crate) fn emit(&self, event: HookEvent) {
202 if let Some(tx) = &self.hook_tx {
203 let _ = tx.send(event);
204 }
205 }
206}
207
208pub struct NoLocker;
210
211impl crate::lock::SendLock for NoLocker {
212 async fn release(self) -> Result<(), TusError> {
213 Ok(())
214 }
215}
216
217impl crate::lock::SendLocker for NoLocker {
218 type LockType = NoLocker;
219 async fn acquire(&self, _id: &crate::info::UploadId) -> Result<NoLocker, TusError> {
220 Ok(NoLocker)
221 }
222}