Skip to main content

dog_blob/
session_store.rs

1use async_trait::async_trait;
2use crate::{
3    BlobError, BlobResult, PartReceipt, UploadId, UploadSession, UploadSessionStore, UploadStatus,
4};
5use std::collections::HashMap;
6use std::sync::{Arc, Mutex};
7
8/// Simple in-memory upload session store provided by dog-blob
9/// 
10/// This is a basic implementation suitable for development and single-instance deployments.
11/// For production use with multiple instances, consider implementing a database-backed store.
12#[derive(Clone)]
13pub struct MemoryUploadSessionStore {
14    sessions: Arc<Mutex<HashMap<String, UploadSession>>>,
15}
16
17impl MemoryUploadSessionStore {
18    /// Create a new in-memory session store
19    pub fn new() -> Self {
20        Self {
21            sessions: Arc::new(Mutex::new(HashMap::new())),
22        }
23    }
24
25    /// Helper method to execute operations on a session, reducing redundancy
26    fn with_session_mut<F, R>(&self, upload_id: &UploadId, f: F) -> BlobResult<R>
27    where
28        F: FnOnce(&mut UploadSession) -> R,
29    {
30        let mut sessions = self.sessions.lock().unwrap();
31        let session = sessions
32            .get_mut(&upload_id.to_string())
33            .ok_or_else(|| BlobError::not_found("Upload session not found"))?;
34        Ok(f(session))
35    }
36
37    /// Get current timestamp
38    fn current_timestamp() -> i64 {
39        chrono::Utc::now().timestamp()
40    }
41}
42
43impl Default for MemoryUploadSessionStore {
44    fn default() -> Self {
45        Self::new()
46    }
47}
48
49#[async_trait]
50impl UploadSessionStore for MemoryUploadSessionStore {
51    async fn create(&self, session: UploadSession) -> BlobResult<UploadSession> {
52        let mut sessions = self.sessions.lock().unwrap();
53        sessions.insert(session.upload_id.to_string(), session.clone());
54        Ok(session)
55    }
56
57    async fn get(&self, upload_id: &UploadId) -> BlobResult<UploadSession> {
58        let sessions = self.sessions.lock().unwrap();
59        sessions
60            .get(&upload_id.to_string())
61            .cloned()
62            .ok_or_else(|| BlobError::not_found("Upload session not found"))
63    }
64
65    async fn update(&self, session: UploadSession) -> BlobResult<UploadSession> {
66        let mut sessions = self.sessions.lock().unwrap();
67        sessions.insert(session.upload_id.to_string(), session.clone());
68        Ok(session)
69    }
70
71    async fn delete(&self, upload_id: &UploadId) -> BlobResult<()> {
72        let mut sessions = self.sessions.lock().unwrap();
73        sessions.remove(&upload_id.to_string());
74        Ok(())
75    }
76
77    async fn record_part(&self, upload_id: &UploadId, receipt: PartReceipt) -> BlobResult<()> {
78        self.with_session_mut(upload_id, |session| {
79            session.progress.parts.insert(receipt.part_number, receipt);
80            session.progress.received_bytes =
81                session.progress.parts.values().map(|p| p.size_bytes).sum();
82        })
83    }
84
85    async fn mark_completed(&self, upload_id: &UploadId, _timestamp: i64) -> BlobResult<()> {
86        self.with_session_mut(upload_id, |session| {
87            session.status = UploadStatus::Completed {
88                completed_at: Self::current_timestamp(),
89            };
90        })
91    }
92
93    async fn mark_failed(
94        &self,
95        upload_id: &UploadId,
96        _timestamp: i64,
97        error: String,
98    ) -> BlobResult<()> {
99        self.with_session_mut(upload_id, |session| {
100            session.status = UploadStatus::Failed {
101                failed_at: Self::current_timestamp(),
102                reason: error,
103            };
104        })
105    }
106
107    async fn mark_aborted(&self, upload_id: &UploadId, _timestamp: i64) -> BlobResult<()> {
108        self.with_session_mut(upload_id, |session| {
109            session.status = UploadStatus::Aborted {
110                aborted_at: Self::current_timestamp(),
111            };
112        })
113    }
114}