dog_blob/
session_store.rs1use async_trait::async_trait;
2use crate::{
3 BlobError, BlobResult, PartReceipt, UploadId, UploadSession, UploadSessionStore, UploadStatus,
4};
5use std::collections::HashMap;
6use std::sync::{Arc, Mutex};
7
8#[derive(Clone)]
13pub struct MemoryUploadSessionStore {
14 sessions: Arc<Mutex<HashMap<String, UploadSession>>>,
15}
16
17impl MemoryUploadSessionStore {
18 pub fn new() -> Self {
20 Self {
21 sessions: Arc::new(Mutex::new(HashMap::new())),
22 }
23 }
24
25 fn with_session_mut<F, R>(&self, upload_id: &UploadId, f: F) -> BlobResult<R>
27 where
28 F: FnOnce(&mut UploadSession) -> R,
29 {
30 let mut sessions = self.sessions.lock().unwrap();
31 let session = sessions
32 .get_mut(&upload_id.to_string())
33 .ok_or_else(|| BlobError::not_found("Upload session not found"))?;
34 Ok(f(session))
35 }
36
37 fn current_timestamp() -> i64 {
39 chrono::Utc::now().timestamp()
40 }
41}
42
43impl Default for MemoryUploadSessionStore {
44 fn default() -> Self {
45 Self::new()
46 }
47}
48
49#[async_trait]
50impl UploadSessionStore for MemoryUploadSessionStore {
51 async fn create(&self, session: UploadSession) -> BlobResult<UploadSession> {
52 let mut sessions = self.sessions.lock().unwrap();
53 sessions.insert(session.upload_id.to_string(), session.clone());
54 Ok(session)
55 }
56
57 async fn get(&self, upload_id: &UploadId) -> BlobResult<UploadSession> {
58 let sessions = self.sessions.lock().unwrap();
59 sessions
60 .get(&upload_id.to_string())
61 .cloned()
62 .ok_or_else(|| BlobError::not_found("Upload session not found"))
63 }
64
65 async fn update(&self, session: UploadSession) -> BlobResult<UploadSession> {
66 let mut sessions = self.sessions.lock().unwrap();
67 sessions.insert(session.upload_id.to_string(), session.clone());
68 Ok(session)
69 }
70
71 async fn delete(&self, upload_id: &UploadId) -> BlobResult<()> {
72 let mut sessions = self.sessions.lock().unwrap();
73 sessions.remove(&upload_id.to_string());
74 Ok(())
75 }
76
77 async fn record_part(&self, upload_id: &UploadId, receipt: PartReceipt) -> BlobResult<()> {
78 self.with_session_mut(upload_id, |session| {
79 session.progress.parts.insert(receipt.part_number, receipt);
80 session.progress.received_bytes =
81 session.progress.parts.values().map(|p| p.size_bytes).sum();
82 })
83 }
84
85 async fn mark_completed(&self, upload_id: &UploadId, _timestamp: i64) -> BlobResult<()> {
86 self.with_session_mut(upload_id, |session| {
87 session.status = UploadStatus::Completed {
88 completed_at: Self::current_timestamp(),
89 };
90 })
91 }
92
93 async fn mark_failed(
94 &self,
95 upload_id: &UploadId,
96 _timestamp: i64,
97 error: String,
98 ) -> BlobResult<()> {
99 self.with_session_mut(upload_id, |session| {
100 session.status = UploadStatus::Failed {
101 failed_at: Self::current_timestamp(),
102 reason: error,
103 };
104 })
105 }
106
107 async fn mark_aborted(&self, upload_id: &UploadId, _timestamp: i64) -> BlobResult<()> {
108 self.with_session_mut(upload_id, |session| {
109 session.status = UploadStatus::Aborted {
110 aborted_at: Self::current_timestamp(),
111 };
112 })
113 }
114}