Skip to main content

fileloft_store_memory/
store.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use bytes::BytesMut;
5use fileloft_core::{
6    error::TusError,
7    info::{UploadId, UploadInfo},
8    store::{SendDataStore, SendUpload},
9};
10use std::io::Cursor;
11
12use tokio::io::AsyncReadExt;
13use tokio::sync::RwLock;
14
15#[derive(Debug)]
16struct MemoryUploadState {
17    info: UploadInfo,
18    data: BytesMut,
19}
20
21type StoreMap = Arc<RwLock<HashMap<String, MemoryUploadState>>>;
22
23/// An in-memory tus storage backend.
24///
25/// Data is held in a shared `Arc<RwLock<HashMap>>` so all handles and clones
26/// refer to the same underlying state. Suitable for testing and development;
27/// not intended for production use (no persistence, no multi-process support).
28#[derive(Clone)]
29pub struct MemoryStore {
30    state: StoreMap,
31}
32
33impl MemoryStore {
34    pub fn new() -> Self {
35        Self {
36            state: Arc::new(RwLock::new(HashMap::new())),
37        }
38    }
39}
40
41impl Default for MemoryStore {
42    fn default() -> Self {
43        Self::new()
44    }
45}
46
47impl SendDataStore for MemoryStore {
48    type UploadType = MemoryUpload;
49
50    async fn create_upload(&self, info: UploadInfo) -> Result<MemoryUpload, TusError> {
51        let id = info.id.clone();
52        let mut state = self.state.write().await;
53        state.insert(
54            id.as_str().to_string(),
55            MemoryUploadState {
56                info,
57                data: BytesMut::new(),
58            },
59        );
60        Ok(MemoryUpload {
61            id,
62            store: Arc::clone(&self.state),
63        })
64    }
65
66    async fn get_upload(&self, id: &UploadId) -> Result<MemoryUpload, TusError> {
67        let state = self.state.read().await;
68        if state.contains_key(id.as_str()) {
69            Ok(MemoryUpload {
70                id: id.clone(),
71                store: Arc::clone(&self.state),
72            })
73        } else {
74            Err(TusError::NotFound(id.to_string()))
75        }
76    }
77}
78
79/// Handle to a single in-memory upload.
80pub struct MemoryUpload {
81    id: UploadId,
82    store: StoreMap,
83}
84
85impl SendUpload for MemoryUpload {
86    async fn write_chunk(
87        &mut self,
88        offset: u64,
89        reader: &mut (dyn tokio::io::AsyncRead + Unpin + Send),
90    ) -> Result<u64, TusError> {
91        let mut buf = Vec::new();
92        reader.read_to_end(&mut buf).await?;
93        let n = buf.len() as u64;
94
95        let mut state = self.store.write().await;
96        let entry = state
97            .get_mut(self.id.as_str())
98            .ok_or_else(|| TusError::NotFound(self.id.to_string()))?;
99
100        let end_offset = offset
101            .checked_add(n)
102            .ok_or_else(|| TusError::Internal("upload offset overflow".into()))?;
103        if let Some(declared) = entry.info.size {
104            if end_offset > declared {
105                return Err(TusError::ExceedsUploadLength {
106                    declared,
107                    end: end_offset,
108                });
109            }
110        }
111
112        // Ensure the buffer is large enough
113        let end = (offset + n) as usize;
114        if entry.data.len() < end {
115            entry.data.resize(end, 0);
116        }
117        entry.data[offset as usize..end].copy_from_slice(&buf);
118        entry.info.offset = offset + n;
119        Ok(n)
120    }
121
122    async fn get_info(&self) -> Result<UploadInfo, TusError> {
123        let state = self.store.read().await;
124        state
125            .get(self.id.as_str())
126            .map(|s| s.info.clone())
127            .ok_or_else(|| TusError::NotFound(self.id.to_string()))
128    }
129
130    async fn finalize(&mut self) -> Result<(), TusError> {
131        // In-memory: nothing to do (already committed on write_chunk)
132        Ok(())
133    }
134
135    async fn delete(self) -> Result<(), TusError> {
136        let mut state = self.store.write().await;
137        state
138            .remove(self.id.as_str())
139            .ok_or_else(|| TusError::NotFound(self.id.to_string()))?;
140        Ok(())
141    }
142
143    async fn declare_length(&mut self, length: u64) -> Result<(), TusError> {
144        let mut state = self.store.write().await;
145        let entry = state
146            .get_mut(self.id.as_str())
147            .ok_or_else(|| TusError::NotFound(self.id.to_string()))?;
148        if entry.info.size.is_some() {
149            return Err(TusError::UploadLengthAlreadySet);
150        }
151        entry.info.size = Some(length);
152        entry.info.size_is_deferred = false;
153        Ok(())
154    }
155
156    async fn read_content(&self) -> Result<Box<dyn tokio::io::AsyncRead + Send + Unpin>, TusError> {
157        let state = self.store.read().await;
158        let entry = state
159            .get(self.id.as_str())
160            .ok_or_else(|| TusError::NotFound(self.id.to_string()))?;
161        if !entry.info.is_complete() {
162            return Err(TusError::UploadNotReadyForDownload);
163        }
164        let bytes = bytes::Bytes::copy_from_slice(&entry.data);
165        Ok(Box::new(Cursor::new(bytes)))
166    }
167
168    async fn concatenate(&mut self, partials: &[UploadInfo]) -> Result<(), TusError> {
169        // Collect data from each partial upload
170        let mut combined = BytesMut::new();
171        {
172            let state = self.store.read().await;
173            for partial in partials {
174                let entry = state
175                    .get(partial.id.as_str())
176                    .ok_or_else(|| TusError::NotFound(partial.id.to_string()))?;
177                combined.extend_from_slice(&entry.data);
178            }
179        }
180
181        let total = combined.len() as u64;
182        let mut state = self.store.write().await;
183        let entry = state
184            .get_mut(self.id.as_str())
185            .ok_or_else(|| TusError::NotFound(self.id.to_string()))?;
186        entry.data = combined;
187        entry.info.size = Some(total);
188        entry.info.offset = total;
189        entry.info.is_final = true;
190        Ok(())
191    }
192}
193
194/// Retrieve the raw bytes of a completed upload (useful in tests).
195pub async fn get_upload_data(store: &MemoryStore, id: &UploadId) -> Option<bytes::Bytes> {
196    let state = store.state.read().await;
197    state
198        .get(id.as_str())
199        .map(|s| bytes::Bytes::copy_from_slice(&s.data))
200}