Skip to main content

fileloft_store_memory/
store.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use bytes::BytesMut;
5use tokio::io::AsyncReadExt;
6use tokio::sync::RwLock;
7use fileloft_core::{
8    error::TusError,
9    info::{UploadId, UploadInfo},
10    store::{SendDataStore, SendUpload},
11};
12
13#[derive(Debug)]
14struct MemoryUploadState {
15    info: UploadInfo,
16    data: BytesMut,
17}
18
19type StoreMap = Arc<RwLock<HashMap<String, MemoryUploadState>>>;
20
21/// An in-memory tus storage backend.
22///
23/// Data is held in a shared `Arc<RwLock<HashMap>>` so all handles and clones
24/// refer to the same underlying state. Suitable for testing and development;
25/// not intended for production use (no persistence, no multi-process support).
26#[derive(Clone)]
27pub struct MemoryStore {
28    state: StoreMap,
29}
30
31impl MemoryStore {
32    pub fn new() -> Self {
33        Self {
34            state: Arc::new(RwLock::new(HashMap::new())),
35        }
36    }
37}
38
39impl Default for MemoryStore {
40    fn default() -> Self {
41        Self::new()
42    }
43}
44
45impl SendDataStore for MemoryStore {
46    type UploadType = MemoryUpload;
47
48    async fn create_upload(&self, info: UploadInfo) -> Result<MemoryUpload, TusError> {
49        let id = info.id.clone();
50        let mut state = self.state.write().await;
51        state.insert(
52            id.as_str().to_string(),
53            MemoryUploadState {
54                info,
55                data: BytesMut::new(),
56            },
57        );
58        Ok(MemoryUpload {
59            id,
60            store: Arc::clone(&self.state),
61        })
62    }
63
64    async fn get_upload(&self, id: &UploadId) -> Result<MemoryUpload, TusError> {
65        let state = self.state.read().await;
66        if state.contains_key(id.as_str()) {
67            Ok(MemoryUpload {
68                id: id.clone(),
69                store: Arc::clone(&self.state),
70            })
71        } else {
72            Err(TusError::NotFound(id.to_string()))
73        }
74    }
75}
76
77/// Handle to a single in-memory upload.
78pub struct MemoryUpload {
79    id: UploadId,
80    store: StoreMap,
81}
82
83impl SendUpload for MemoryUpload {
84    async fn write_chunk(
85        &mut self,
86        offset: u64,
87        reader: &mut (dyn tokio::io::AsyncRead + Unpin + Send),
88    ) -> Result<u64, TusError> {
89        let mut buf = Vec::new();
90        reader.read_to_end(&mut buf).await?;
91        let n = buf.len() as u64;
92
93        let mut state = self.store.write().await;
94        let entry = state
95            .get_mut(self.id.as_str())
96            .ok_or_else(|| TusError::NotFound(self.id.to_string()))?;
97
98        let end_offset = offset.checked_add(n).ok_or_else(|| {
99            TusError::Internal("upload offset overflow".into())
100        })?;
101        if let Some(declared) = entry.info.size {
102            if end_offset > declared {
103                return Err(TusError::ExceedsUploadLength {
104                    declared,
105                    end: end_offset,
106                });
107            }
108        }
109
110        // Ensure the buffer is large enough
111        let end = (offset + n) as usize;
112        if entry.data.len() < end {
113            entry.data.resize(end, 0);
114        }
115        entry.data[offset as usize..end].copy_from_slice(&buf);
116        entry.info.offset = offset + n;
117        Ok(n)
118    }
119
120    async fn get_info(&self) -> Result<UploadInfo, TusError> {
121        let state = self.store.read().await;
122        state
123            .get(self.id.as_str())
124            .map(|s| s.info.clone())
125            .ok_or_else(|| TusError::NotFound(self.id.to_string()))
126    }
127
128    async fn finalize(&mut self) -> Result<(), TusError> {
129        // In-memory: nothing to do (already committed on write_chunk)
130        Ok(())
131    }
132
133    async fn delete(self) -> Result<(), TusError> {
134        let mut state = self.store.write().await;
135        state
136            .remove(self.id.as_str())
137            .ok_or_else(|| TusError::NotFound(self.id.to_string()))?;
138        Ok(())
139    }
140
141    async fn declare_length(&mut self, length: u64) -> Result<(), TusError> {
142        let mut state = self.store.write().await;
143        let entry = state
144            .get_mut(self.id.as_str())
145            .ok_or_else(|| TusError::NotFound(self.id.to_string()))?;
146        if entry.info.size.is_some() {
147            return Err(TusError::UploadLengthAlreadySet);
148        }
149        entry.info.size = Some(length);
150        entry.info.size_is_deferred = false;
151        Ok(())
152    }
153
154    async fn concatenate(&mut self, partials: &[UploadInfo]) -> Result<(), TusError> {
155        // Collect data from each partial upload
156        let mut combined = BytesMut::new();
157        {
158            let state = self.store.read().await;
159            for partial in partials {
160                let entry = state
161                    .get(partial.id.as_str())
162                    .ok_or_else(|| TusError::NotFound(partial.id.to_string()))?;
163                combined.extend_from_slice(&entry.data);
164            }
165        }
166
167        let total = combined.len() as u64;
168        let mut state = self.store.write().await;
169        let entry = state
170            .get_mut(self.id.as_str())
171            .ok_or_else(|| TusError::NotFound(self.id.to_string()))?;
172        entry.data = combined;
173        entry.info.size = Some(total);
174        entry.info.offset = total;
175        entry.info.is_final = true;
176        Ok(())
177    }
178}
179
180/// Retrieve the raw bytes of a completed upload (useful in tests).
181pub async fn get_upload_data(store: &MemoryStore, id: &UploadId) -> Option<bytes::Bytes> {
182    let state = store.state.read().await;
183    state
184        .get(id.as_str())
185        .map(|s| bytes::Bytes::copy_from_slice(&s.data))
186}