fileloft_store_memory/
store.rs1use std::collections::HashMap;
2use std::sync::Arc;
3
4use bytes::BytesMut;
5use tokio::io::AsyncReadExt;
6use tokio::sync::RwLock;
7use fileloft_core::{
8 error::TusError,
9 info::{UploadId, UploadInfo},
10 store::{SendDataStore, SendUpload},
11};
12
13#[derive(Debug)]
14struct MemoryUploadState {
15 info: UploadInfo,
16 data: BytesMut,
17}
18
19type StoreMap = Arc<RwLock<HashMap<String, MemoryUploadState>>>;
20
21#[derive(Clone)]
27pub struct MemoryStore {
28 state: StoreMap,
29}
30
31impl MemoryStore {
32 pub fn new() -> Self {
33 Self {
34 state: Arc::new(RwLock::new(HashMap::new())),
35 }
36 }
37}
38
39impl Default for MemoryStore {
40 fn default() -> Self {
41 Self::new()
42 }
43}
44
45impl SendDataStore for MemoryStore {
46 type UploadType = MemoryUpload;
47
48 async fn create_upload(&self, info: UploadInfo) -> Result<MemoryUpload, TusError> {
49 let id = info.id.clone();
50 let mut state = self.state.write().await;
51 state.insert(
52 id.as_str().to_string(),
53 MemoryUploadState {
54 info,
55 data: BytesMut::new(),
56 },
57 );
58 Ok(MemoryUpload {
59 id,
60 store: Arc::clone(&self.state),
61 })
62 }
63
64 async fn get_upload(&self, id: &UploadId) -> Result<MemoryUpload, TusError> {
65 let state = self.state.read().await;
66 if state.contains_key(id.as_str()) {
67 Ok(MemoryUpload {
68 id: id.clone(),
69 store: Arc::clone(&self.state),
70 })
71 } else {
72 Err(TusError::NotFound(id.to_string()))
73 }
74 }
75}
76
77pub struct MemoryUpload {
79 id: UploadId,
80 store: StoreMap,
81}
82
83impl SendUpload for MemoryUpload {
84 async fn write_chunk(
85 &mut self,
86 offset: u64,
87 reader: &mut (dyn tokio::io::AsyncRead + Unpin + Send),
88 ) -> Result<u64, TusError> {
89 let mut buf = Vec::new();
90 reader.read_to_end(&mut buf).await?;
91 let n = buf.len() as u64;
92
93 let mut state = self.store.write().await;
94 let entry = state
95 .get_mut(self.id.as_str())
96 .ok_or_else(|| TusError::NotFound(self.id.to_string()))?;
97
98 let end_offset = offset.checked_add(n).ok_or_else(|| {
99 TusError::Internal("upload offset overflow".into())
100 })?;
101 if let Some(declared) = entry.info.size {
102 if end_offset > declared {
103 return Err(TusError::ExceedsUploadLength {
104 declared,
105 end: end_offset,
106 });
107 }
108 }
109
110 let end = (offset + n) as usize;
112 if entry.data.len() < end {
113 entry.data.resize(end, 0);
114 }
115 entry.data[offset as usize..end].copy_from_slice(&buf);
116 entry.info.offset = offset + n;
117 Ok(n)
118 }
119
120 async fn get_info(&self) -> Result<UploadInfo, TusError> {
121 let state = self.store.read().await;
122 state
123 .get(self.id.as_str())
124 .map(|s| s.info.clone())
125 .ok_or_else(|| TusError::NotFound(self.id.to_string()))
126 }
127
128 async fn finalize(&mut self) -> Result<(), TusError> {
129 Ok(())
131 }
132
133 async fn delete(self) -> Result<(), TusError> {
134 let mut state = self.store.write().await;
135 state
136 .remove(self.id.as_str())
137 .ok_or_else(|| TusError::NotFound(self.id.to_string()))?;
138 Ok(())
139 }
140
141 async fn declare_length(&mut self, length: u64) -> Result<(), TusError> {
142 let mut state = self.store.write().await;
143 let entry = state
144 .get_mut(self.id.as_str())
145 .ok_or_else(|| TusError::NotFound(self.id.to_string()))?;
146 if entry.info.size.is_some() {
147 return Err(TusError::UploadLengthAlreadySet);
148 }
149 entry.info.size = Some(length);
150 entry.info.size_is_deferred = false;
151 Ok(())
152 }
153
154 async fn concatenate(&mut self, partials: &[UploadInfo]) -> Result<(), TusError> {
155 let mut combined = BytesMut::new();
157 {
158 let state = self.store.read().await;
159 for partial in partials {
160 let entry = state
161 .get(partial.id.as_str())
162 .ok_or_else(|| TusError::NotFound(partial.id.to_string()))?;
163 combined.extend_from_slice(&entry.data);
164 }
165 }
166
167 let total = combined.len() as u64;
168 let mut state = self.store.write().await;
169 let entry = state
170 .get_mut(self.id.as_str())
171 .ok_or_else(|| TusError::NotFound(self.id.to_string()))?;
172 entry.data = combined;
173 entry.info.size = Some(total);
174 entry.info.offset = total;
175 entry.info.is_final = true;
176 Ok(())
177 }
178}
179
180pub async fn get_upload_data(store: &MemoryStore, id: &UploadId) -> Option<bytes::Bytes> {
182 let state = store.state.read().await;
183 state
184 .get(id.as_str())
185 .map(|s| bytes::Bytes::copy_from_slice(&s.data))
186}