1use async_trait::async_trait;
2use chrono::Utc;
3use std::collections::HashMap;
4use std::sync::{Arc, Mutex, RwLock};
5
6use super::{MemoryConfig, Storage, StorageError};
7use crate::core::{Job, JobState};
8
9#[derive(Debug, Clone)]
11struct JobLock {
12 worker_id: String,
13 expires_at: chrono::DateTime<chrono::Utc>,
14}
15
16#[derive(Debug)]
22pub struct MemoryStorage {
23 jobs: RwLock<HashMap<String, Job>>,
24 locks: Arc<Mutex<HashMap<String, JobLock>>>,
25 config: MemoryConfig,
26}
27
28impl MemoryStorage {
29 pub fn new() -> Self {
31 Self::with_config(MemoryConfig::default())
32 }
33
34 pub fn with_config(config: MemoryConfig) -> Self {
36 Self {
37 jobs: RwLock::new(HashMap::new()),
38 locks: Arc::new(Mutex::new(HashMap::new())),
39 config,
40 }
41 }
42
43 pub fn len(&self) -> usize {
45 self.jobs.read().unwrap().len()
46 }
47
48 pub fn is_empty(&self) -> bool {
50 self.jobs.read().unwrap().is_empty()
51 }
52
53 pub fn clear(&self) {
55 self.jobs.write().unwrap().clear();
56 }
57
58 fn is_at_capacity(&self) -> bool {
60 if let Some(max_jobs) = self.config.max_jobs {
61 self.len() >= max_jobs
62 } else {
63 false
64 }
65 }
66
67 fn maybe_cleanup(&self) {
69 if !self.config.auto_cleanup {
70 return;
71 }
72
73 let mut jobs = self.jobs.write().unwrap();
74 jobs.retain(|_, job| {
75 !matches!(
76 job.state,
77 JobState::Succeeded { .. } | JobState::Deleted { .. }
78 )
79 });
80 }
81
82 fn filter_jobs_by_state(jobs: &HashMap<String, Job>, state: &JobState) -> Vec<Job> {
84 jobs.values()
85 .filter(|job| std::mem::discriminant(&job.state) == std::mem::discriminant(state))
86 .cloned()
87 .collect()
88 }
89
90 fn get_available_jobs_internal(jobs: &HashMap<String, Job>, limit: Option<usize>) -> Vec<Job> {
92 let now = Utc::now();
93 let mut available_jobs: Vec<Job> = jobs
94 .values()
95 .filter(|job| match &job.state {
96 JobState::Enqueued { .. } => true,
97 JobState::Scheduled { enqueue_at, .. } => *enqueue_at <= now,
98 JobState::AwaitingRetry { retry_at, .. } => *retry_at <= now,
99 _ => false,
100 })
101 .cloned()
102 .collect();
103
104 available_jobs.sort_by(|a, b| {
106 b.priority
107 .cmp(&a.priority)
108 .then_with(|| a.created_at.cmp(&b.created_at))
109 });
110
111 if let Some(limit) = limit {
112 available_jobs.truncate(limit);
113 }
114
115 available_jobs
116 }
117}
118
119impl Default for MemoryStorage {
120 fn default() -> Self {
121 Self::new()
122 }
123}
124
125#[async_trait]
126impl Storage for MemoryStorage {
127 async fn enqueue(&self, job: &Job) -> Result<(), StorageError> {
128 if self.is_at_capacity() {
130 return Err(StorageError::capacity_exceeded(format!(
131 "Memory storage is at capacity ({} jobs)",
132 self.len()
133 )));
134 }
135
136 self.maybe_cleanup();
138
139 let mut jobs = self.jobs.write().unwrap();
141 jobs.insert(job.id.clone(), job.clone());
142
143 Ok(())
144 }
145
146 async fn get(&self, job_id: &str) -> Result<Option<Job>, StorageError> {
147 let jobs = self.jobs.read().unwrap();
148 Ok(jobs.get(job_id).cloned())
149 }
150
151 async fn update(&self, job: &Job) -> Result<(), StorageError> {
152 let mut jobs = self.jobs.write().unwrap();
153
154 if jobs.contains_key(&job.id) {
155 jobs.insert(job.id.clone(), job.clone());
156 Ok(())
157 } else {
158 Err(StorageError::job_not_found(job.id.clone()))
159 }
160 }
161
162 async fn delete(&self, job_id: &str) -> Result<bool, StorageError> {
163 let mut jobs = self.jobs.write().unwrap();
164 Ok(jobs.remove(job_id).is_some())
165 }
166
167 async fn list(
168 &self,
169 state_filter: Option<&JobState>,
170 limit: Option<usize>,
171 offset: Option<usize>,
172 ) -> Result<Vec<Job>, StorageError> {
173 let jobs = self.jobs.read().unwrap();
174
175 let mut filtered_jobs: Vec<Job> = if let Some(state) = state_filter {
176 Self::filter_jobs_by_state(&jobs, state)
177 } else {
178 jobs.values().cloned().collect()
179 };
180
181 filtered_jobs.sort_by(|a, b| b.created_at.cmp(&a.created_at));
183
184 let start = offset.unwrap_or(0);
186 let end = if let Some(limit) = limit {
187 std::cmp::min(start + limit, filtered_jobs.len())
188 } else {
189 filtered_jobs.len()
190 };
191
192 if start >= filtered_jobs.len() {
193 Ok(vec![])
194 } else {
195 Ok(filtered_jobs[start..end].to_vec())
196 }
197 }
198
199 async fn get_job_counts(&self) -> Result<HashMap<JobState, usize>, StorageError> {
200 let jobs = self.jobs.read().unwrap();
201 let mut counts = HashMap::new();
202
203 for job in jobs.values() {
204 let key = match &job.state {
205 JobState::Enqueued { .. } => JobState::enqueued(""),
206 JobState::Processing { .. } => JobState::processing("", ""),
207 JobState::Succeeded { .. } => JobState::succeeded(0, None),
208 JobState::Failed { .. } => JobState::failed("", None, 0),
209 JobState::Deleted { .. } => JobState::deleted(None),
210 JobState::Scheduled { .. } => JobState::scheduled(Utc::now(), ""),
211 JobState::AwaitingRetry { .. } => JobState::awaiting_retry(Utc::now(), 0, ""),
212 };
213 *counts.entry(key).or_insert(0) += 1;
214 }
215
216 Ok(counts)
217 }
218
219 async fn get_available_jobs(&self, limit: Option<usize>) -> Result<Vec<Job>, StorageError> {
220 let jobs = self.jobs.read().unwrap();
221 Ok(Self::get_available_jobs_internal(&jobs, limit))
222 }
223
224 async fn fetch_and_lock_job(
225 &self,
226 worker_id: &str,
227 queues: Option<&[String]>,
228 ) -> Result<Option<Job>, StorageError> {
229 self.cleanup_expired_locks();
231
232 let mut jobs = self.jobs.write().unwrap();
233 let mut locks = self.locks.lock().unwrap();
234
235 let available_jobs = Self::get_available_jobs_internal(&jobs, None);
237
238 for mut job in available_jobs {
239 if let Some(queues) = queues {
241 if !queues.is_empty() && !queues.contains(&job.queue) {
242 continue;
243 }
244 }
245
246 if locks.contains_key(&job.id) {
248 continue;
249 }
250
251 let lock = JobLock {
253 worker_id: worker_id.to_string(),
254 expires_at: chrono::Utc::now() + chrono::Duration::minutes(30), };
256 locks.insert(job.id.clone(), lock);
257
258 job.state = JobState::Processing {
260 worker_id: worker_id.to_string(),
261 started_at: chrono::Utc::now(),
262 server_name: "memory-storage".to_string(),
263 };
264
265 jobs.insert(job.id.clone(), job.clone());
267
268 return Ok(Some(job));
269 }
270
271 Ok(None)
272 }
273
274 async fn try_acquire_job_lock(
275 &self,
276 job_id: &str,
277 worker_id: &str,
278 timeout_seconds: u64,
279 ) -> Result<bool, StorageError> {
280 self.cleanup_expired_locks();
281
282 let mut locks = self.locks.lock().unwrap();
283
284 if locks.contains_key(job_id) {
286 return Ok(false);
287 }
288
289 let lock = JobLock {
291 worker_id: worker_id.to_string(),
292 expires_at: chrono::Utc::now() + chrono::Duration::seconds(timeout_seconds as i64),
293 };
294 locks.insert(job_id.to_string(), lock);
295
296 Ok(true)
297 }
298
299 async fn release_job_lock(&self, job_id: &str, worker_id: &str) -> Result<bool, StorageError> {
300 let mut locks = self.locks.lock().unwrap();
301
302 if let Some(lock) = locks.get(job_id) {
303 if lock.worker_id == worker_id {
304 locks.remove(job_id);
305 return Ok(true);
306 }
307 }
308
309 Ok(false)
310 }
311
312 async fn fetch_available_jobs_atomic(
313 &self,
314 worker_id: &str,
315 limit: Option<usize>,
316 queues: Option<&[String]>,
317 ) -> Result<Vec<Job>, StorageError> {
318 let mut jobs = Vec::new();
319 let fetch_limit = limit.unwrap_or(10).min(100); for _ in 0..fetch_limit {
323 match self.fetch_and_lock_job(worker_id, queues).await? {
324 Some(job) => jobs.push(job),
325 None => break, }
327 }
328
329 Ok(jobs)
330 }
331}
332
333impl MemoryStorage {
334 fn cleanup_expired_locks(&self) {
336 let mut locks = self.locks.lock().unwrap();
337 let now = chrono::Utc::now();
338 locks.retain(|_, lock| lock.expires_at > now);
339 }
340}
341
342#[cfg(test)]
343mod tests {
344 use super::*;
345 use crate::core::Job;
346 use chrono::Duration;
347
348 fn create_test_job() -> Job {
349 Job::new("test_job", vec!["test_arg".to_string()])
350 }
351
352 #[tokio::test]
353 async fn test_memory_storage_basic_operations() {
354 let storage = MemoryStorage::new();
355 let job = create_test_job();
356
357 assert!(storage.enqueue(&job).await.is_ok());
359 assert_eq!(storage.len(), 1);
360
361 let retrieved = storage.get(&job.id).await.unwrap();
363 assert!(retrieved.is_some());
364 assert_eq!(retrieved.unwrap().id, job.id);
365
366 let mut updated_job = job.clone();
368 updated_job.state = JobState::processing("worker1", "server1");
369 assert!(storage.update(&updated_job).await.is_ok());
370
371 let retrieved = storage.get(&job.id).await.unwrap().unwrap();
372 assert!(matches!(retrieved.state, JobState::Processing { .. }));
373
374 let deleted = storage.delete(&job.id).await.unwrap();
376 assert!(deleted);
377 assert_eq!(storage.len(), 0);
378
379 let deleted = storage.delete(&job.id).await.unwrap();
381 assert!(!deleted);
382 }
383
384 #[tokio::test]
385 async fn test_memory_storage_list_operations() {
386 let storage = MemoryStorage::new();
387
388 let mut job1 = create_test_job();
390 job1.state = JobState::enqueued("default");
391
392 let mut job2 = create_test_job();
393 job2.state = JobState::processing("worker1", "server1");
394
395 let mut job3 = create_test_job();
396 job3.state = JobState::succeeded(100, None);
397
398 storage.enqueue(&job1).await.unwrap();
399 storage.enqueue(&job2).await.unwrap();
400 storage.enqueue(&job3).await.unwrap();
401
402 let all_jobs = storage.list(None, None, None).await.unwrap();
404 assert_eq!(all_jobs.len(), 3);
405
406 let enqueued_state = JobState::enqueued("default");
408 let enqueued_jobs = storage
409 .list(Some(&enqueued_state), None, None)
410 .await
411 .unwrap();
412 assert_eq!(enqueued_jobs.len(), 1);
413 assert!(matches!(enqueued_jobs[0].state, JobState::Enqueued { .. }));
414
415 let limited_jobs = storage.list(None, Some(2), None).await.unwrap();
417 assert_eq!(limited_jobs.len(), 2);
418
419 let offset_jobs = storage.list(None, None, Some(1)).await.unwrap();
421 assert_eq!(offset_jobs.len(), 2);
422
423 let paginated_jobs = storage.list(None, Some(1), Some(1)).await.unwrap();
425 assert_eq!(paginated_jobs.len(), 1);
426 }
427
428 #[tokio::test]
429 async fn test_memory_storage_job_counts() {
430 let storage = MemoryStorage::new();
431
432 let mut job1 = create_test_job();
433 job1.state = JobState::enqueued("default");
434
435 let mut job2 = create_test_job();
436 job2.state = JobState::enqueued("default");
437
438 let mut job3 = create_test_job();
439 job3.state = JobState::processing("worker1", "server1");
440
441 storage.enqueue(&job1).await.unwrap();
442 storage.enqueue(&job2).await.unwrap();
443 storage.enqueue(&job3).await.unwrap();
444
445 let counts = storage.get_job_counts().await.unwrap();
446
447 assert!(counts.len() >= 2);
449
450 let has_enqueued = counts
452 .keys()
453 .any(|k| matches!(k, JobState::Enqueued { .. }));
454 let has_processing = counts
455 .keys()
456 .any(|k| matches!(k, JobState::Processing { .. }));
457 assert!(has_enqueued);
458 assert!(has_processing);
459 }
460
461 #[tokio::test]
462 async fn test_memory_storage_available_jobs() {
463 let storage = MemoryStorage::new();
464
465 let mut job1 = create_test_job();
467 job1.state = JobState::enqueued("default");
468
469 let mut job2 = create_test_job();
470 job2.state = JobState::scheduled(Utc::now() - Duration::hours(1), "delay");
471
472 let mut job3 = create_test_job();
473 job3.state = JobState::scheduled(Utc::now() + Duration::hours(1), "delay");
474
475 let mut job4 = create_test_job();
476 job4.state = JobState::processing("worker1", "server1");
477
478 storage.enqueue(&job1).await.unwrap();
479 storage.enqueue(&job2).await.unwrap();
480 storage.enqueue(&job3).await.unwrap();
481 storage.enqueue(&job4).await.unwrap();
482
483 let available = storage.get_available_jobs(None).await.unwrap();
484 assert_eq!(available.len(), 2); let limited_available = storage.get_available_jobs(Some(1)).await.unwrap();
488 assert_eq!(limited_available.len(), 1);
489 }
490
491 #[tokio::test]
492 async fn test_memory_storage_capacity_limit() {
493 let config = MemoryConfig::new().with_max_jobs(2);
494 let storage = MemoryStorage::with_config(config);
495
496 let job1 = create_test_job();
497 let job2 = create_test_job();
498 let job3 = create_test_job();
499
500 assert!(storage.enqueue(&job1).await.is_ok());
501 assert!(storage.enqueue(&job2).await.is_ok());
502
503 let result = storage.enqueue(&job3).await;
505 assert!(result.is_err());
506 assert!(matches!(
507 result.unwrap_err(),
508 StorageError::CapacityExceeded { .. }
509 ));
510 }
511
512 #[tokio::test]
513 async fn test_memory_storage_auto_cleanup() {
514 let config = MemoryConfig::new().with_max_jobs(3).with_auto_cleanup(true);
515 let storage = MemoryStorage::with_config(config);
516
517 let mut job1 = create_test_job();
518 job1.state = JobState::succeeded(100, None); let mut job2 = create_test_job();
521 job2.state = JobState::enqueued("default"); let job3 = create_test_job(); storage.enqueue(&job1).await.unwrap();
526 storage.enqueue(&job2).await.unwrap();
527
528 assert!(storage.enqueue(&job3).await.is_ok());
530 assert_eq!(storage.len(), 2); }
532
533 #[tokio::test]
534 async fn test_memory_storage_update_nonexistent() {
535 let storage = MemoryStorage::new();
536 let job = create_test_job();
537
538 let result = storage.update(&job).await;
539 assert!(result.is_err());
540 assert!(matches!(
541 result.unwrap_err(),
542 StorageError::JobNotFound { .. }
543 ));
544 }
545}