fileloft_store_memory/
store.rs1use std::collections::HashMap;
2use std::sync::Arc;
3
4use bytes::BytesMut;
5use fileloft_core::{
6 error::TusError,
7 info::{UploadId, UploadInfo},
8 store::{SendDataStore, SendUpload},
9};
10use std::io::Cursor;
11
12use tokio::io::AsyncReadExt;
13use tokio::sync::RwLock;
14
15#[derive(Debug)]
16struct MemoryUploadState {
17 info: UploadInfo,
18 data: BytesMut,
19}
20
21type StoreMap = Arc<RwLock<HashMap<String, MemoryUploadState>>>;
22
23#[derive(Clone)]
29pub struct MemoryStore {
30 state: StoreMap,
31}
32
33impl MemoryStore {
34 pub fn new() -> Self {
35 Self {
36 state: Arc::new(RwLock::new(HashMap::new())),
37 }
38 }
39}
40
41impl Default for MemoryStore {
42 fn default() -> Self {
43 Self::new()
44 }
45}
46
47impl SendDataStore for MemoryStore {
48 type UploadType = MemoryUpload;
49
50 async fn create_upload(&self, info: UploadInfo) -> Result<MemoryUpload, TusError> {
51 let id = info.id.clone();
52 let mut state = self.state.write().await;
53 state.insert(
54 id.as_str().to_string(),
55 MemoryUploadState {
56 info,
57 data: BytesMut::new(),
58 },
59 );
60 Ok(MemoryUpload {
61 id,
62 store: Arc::clone(&self.state),
63 })
64 }
65
66 async fn get_upload(&self, id: &UploadId) -> Result<MemoryUpload, TusError> {
67 let state = self.state.read().await;
68 if state.contains_key(id.as_str()) {
69 Ok(MemoryUpload {
70 id: id.clone(),
71 store: Arc::clone(&self.state),
72 })
73 } else {
74 Err(TusError::NotFound(id.to_string()))
75 }
76 }
77}
78
79pub struct MemoryUpload {
81 id: UploadId,
82 store: StoreMap,
83}
84
85impl SendUpload for MemoryUpload {
86 async fn write_chunk(
87 &mut self,
88 offset: u64,
89 reader: &mut (dyn tokio::io::AsyncRead + Unpin + Send),
90 ) -> Result<u64, TusError> {
91 let mut buf = Vec::new();
92 reader.read_to_end(&mut buf).await?;
93 let n = buf.len() as u64;
94
95 let mut state = self.store.write().await;
96 let entry = state
97 .get_mut(self.id.as_str())
98 .ok_or_else(|| TusError::NotFound(self.id.to_string()))?;
99
100 let end_offset = offset
101 .checked_add(n)
102 .ok_or_else(|| TusError::Internal("upload offset overflow".into()))?;
103 if let Some(declared) = entry.info.size {
104 if end_offset > declared {
105 return Err(TusError::ExceedsUploadLength {
106 declared,
107 end: end_offset,
108 });
109 }
110 }
111
112 let end = (offset + n) as usize;
114 if entry.data.len() < end {
115 entry.data.resize(end, 0);
116 }
117 entry.data[offset as usize..end].copy_from_slice(&buf);
118 entry.info.offset = offset + n;
119 Ok(n)
120 }
121
122 async fn get_info(&self) -> Result<UploadInfo, TusError> {
123 let state = self.store.read().await;
124 state
125 .get(self.id.as_str())
126 .map(|s| s.info.clone())
127 .ok_or_else(|| TusError::NotFound(self.id.to_string()))
128 }
129
130 async fn finalize(&mut self) -> Result<(), TusError> {
131 Ok(())
133 }
134
135 async fn delete(self) -> Result<(), TusError> {
136 let mut state = self.store.write().await;
137 state
138 .remove(self.id.as_str())
139 .ok_or_else(|| TusError::NotFound(self.id.to_string()))?;
140 Ok(())
141 }
142
143 async fn declare_length(&mut self, length: u64) -> Result<(), TusError> {
144 let mut state = self.store.write().await;
145 let entry = state
146 .get_mut(self.id.as_str())
147 .ok_or_else(|| TusError::NotFound(self.id.to_string()))?;
148 if entry.info.size.is_some() {
149 return Err(TusError::UploadLengthAlreadySet);
150 }
151 entry.info.size = Some(length);
152 entry.info.size_is_deferred = false;
153 Ok(())
154 }
155
156 async fn read_content(&self) -> Result<Box<dyn tokio::io::AsyncRead + Send + Unpin>, TusError> {
157 let state = self.store.read().await;
158 let entry = state
159 .get(self.id.as_str())
160 .ok_or_else(|| TusError::NotFound(self.id.to_string()))?;
161 if !entry.info.is_complete() {
162 return Err(TusError::UploadNotReadyForDownload);
163 }
164 let bytes = bytes::Bytes::copy_from_slice(&entry.data);
165 Ok(Box::new(Cursor::new(bytes)))
166 }
167
168 async fn concatenate(&mut self, partials: &[UploadInfo]) -> Result<(), TusError> {
169 let mut combined = BytesMut::new();
171 {
172 let state = self.store.read().await;
173 for partial in partials {
174 let entry = state
175 .get(partial.id.as_str())
176 .ok_or_else(|| TusError::NotFound(partial.id.to_string()))?;
177 combined.extend_from_slice(&entry.data);
178 }
179 }
180
181 let total = combined.len() as u64;
182 let mut state = self.store.write().await;
183 let entry = state
184 .get_mut(self.id.as_str())
185 .ok_or_else(|| TusError::NotFound(self.id.to_string()))?;
186 entry.data = combined;
187 entry.info.size = Some(total);
188 entry.info.offset = total;
189 entry.info.is_final = true;
190 Ok(())
191 }
192}
193
194pub async fn get_upload_data(store: &MemoryStore, id: &UploadId) -> Option<bytes::Bytes> {
196 let state = store.state.read().await;
197 state
198 .get(id.as_str())
199 .map(|s| bytes::Bytes::copy_from_slice(&s.data))
200}