qml_rs/storage/
memory.rs

1use async_trait::async_trait;
2use chrono::Utc;
3use std::collections::HashMap;
4use std::sync::{Arc, Mutex, RwLock};
5
6use super::{MemoryConfig, Storage, StorageError};
7use crate::core::{Job, JobState};
8
9/// Job lock information for MemoryStorage
10#[derive(Debug, Clone)]
11struct JobLock {
12    worker_id: String,
13    expires_at: chrono::DateTime<chrono::Utc>,
14}
15
16/// In-memory storage implementation for jobs
17///
18/// This storage keeps all jobs in memory using a HashMap with RwLock for thread safety.
19/// It's primarily intended for development, testing, and simple scenarios where persistence
20/// is not required.
21#[derive(Debug)]
22pub struct MemoryStorage {
23    jobs: RwLock<HashMap<String, Job>>,
24    locks: Arc<Mutex<HashMap<String, JobLock>>>,
25    config: MemoryConfig,
26}
27
28impl MemoryStorage {
29    /// Create a new memory storage with default configuration
30    pub fn new() -> Self {
31        Self::with_config(MemoryConfig::default())
32    }
33
34    /// Create a new memory storage with the specified configuration
35    pub fn with_config(config: MemoryConfig) -> Self {
36        Self {
37            jobs: RwLock::new(HashMap::new()),
38            locks: Arc::new(Mutex::new(HashMap::new())),
39            config,
40        }
41    }
42
43    /// Get the number of jobs currently stored
44    pub fn len(&self) -> usize {
45        self.jobs.read().unwrap().len()
46    }
47
48    /// Check if the storage is empty
49    pub fn is_empty(&self) -> bool {
50        self.jobs.read().unwrap().is_empty()
51    }
52
53    /// Clear all jobs from storage
54    pub fn clear(&self) {
55        self.jobs.write().unwrap().clear();
56    }
57
58    /// Check if we've exceeded the maximum job limit
59    fn is_at_capacity(&self) -> bool {
60        if let Some(max_jobs) = self.config.max_jobs {
61            self.len() >= max_jobs
62        } else {
63            false
64        }
65    }
66
67    /// Remove completed jobs if auto-cleanup is enabled
68    fn maybe_cleanup(&self) {
69        if !self.config.auto_cleanup {
70            return;
71        }
72
73        let mut jobs = self.jobs.write().unwrap();
74        jobs.retain(|_, job| {
75            !matches!(
76                job.state,
77                JobState::Succeeded { .. } | JobState::Deleted { .. }
78            )
79        });
80    }
81
82    /// Filter jobs by state
83    fn filter_jobs_by_state(jobs: &HashMap<String, Job>, state: &JobState) -> Vec<Job> {
84        jobs.values()
85            .filter(|job| std::mem::discriminant(&job.state) == std::mem::discriminant(state))
86            .cloned()
87            .collect()
88    }
89
90    /// Get jobs available for processing (enqueued, scheduled for now, or awaiting retry)
91    fn get_available_jobs_internal(jobs: &HashMap<String, Job>, limit: Option<usize>) -> Vec<Job> {
92        let now = Utc::now();
93        let mut available_jobs: Vec<Job> = jobs
94            .values()
95            .filter(|job| match &job.state {
96                JobState::Enqueued { .. } => true,
97                JobState::Scheduled { enqueue_at, .. } => *enqueue_at <= now,
98                JobState::AwaitingRetry { retry_at, .. } => *retry_at <= now,
99                _ => false,
100            })
101            .cloned()
102            .collect();
103
104        // Sort by priority (higher priority first), then by creation time (older first)
105        available_jobs.sort_by(|a, b| {
106            b.priority
107                .cmp(&a.priority)
108                .then_with(|| a.created_at.cmp(&b.created_at))
109        });
110
111        if let Some(limit) = limit {
112            available_jobs.truncate(limit);
113        }
114
115        available_jobs
116    }
117}
118
119impl Default for MemoryStorage {
120    fn default() -> Self {
121        Self::new()
122    }
123}
124
125#[async_trait]
126impl Storage for MemoryStorage {
127    async fn enqueue(&self, job: &Job) -> Result<(), StorageError> {
128        // Check capacity before adding
129        if self.is_at_capacity() {
130            return Err(StorageError::capacity_exceeded(format!(
131                "Memory storage is at capacity ({} jobs)",
132                self.len()
133            )));
134        }
135
136        // Perform cleanup if enabled
137        self.maybe_cleanup();
138
139        // Store the job
140        let mut jobs = self.jobs.write().unwrap();
141        jobs.insert(job.id.clone(), job.clone());
142
143        Ok(())
144    }
145
146    async fn get(&self, job_id: &str) -> Result<Option<Job>, StorageError> {
147        let jobs = self.jobs.read().unwrap();
148        Ok(jobs.get(job_id).cloned())
149    }
150
151    async fn update(&self, job: &Job) -> Result<(), StorageError> {
152        let mut jobs = self.jobs.write().unwrap();
153
154        if jobs.contains_key(&job.id) {
155            jobs.insert(job.id.clone(), job.clone());
156            Ok(())
157        } else {
158            Err(StorageError::job_not_found(job.id.clone()))
159        }
160    }
161
162    async fn delete(&self, job_id: &str) -> Result<bool, StorageError> {
163        let mut jobs = self.jobs.write().unwrap();
164        Ok(jobs.remove(job_id).is_some())
165    }
166
167    async fn list(
168        &self,
169        state_filter: Option<&JobState>,
170        limit: Option<usize>,
171        offset: Option<usize>,
172    ) -> Result<Vec<Job>, StorageError> {
173        let jobs = self.jobs.read().unwrap();
174
175        let mut filtered_jobs: Vec<Job> = if let Some(state) = state_filter {
176            Self::filter_jobs_by_state(&jobs, state)
177        } else {
178            jobs.values().cloned().collect()
179        };
180
181        // Sort by creation time (newest first)
182        filtered_jobs.sort_by(|a, b| b.created_at.cmp(&a.created_at));
183
184        // Apply offset and limit
185        let start = offset.unwrap_or(0);
186        let end = if let Some(limit) = limit {
187            std::cmp::min(start + limit, filtered_jobs.len())
188        } else {
189            filtered_jobs.len()
190        };
191
192        if start >= filtered_jobs.len() {
193            Ok(vec![])
194        } else {
195            Ok(filtered_jobs[start..end].to_vec())
196        }
197    }
198
199    async fn get_job_counts(&self) -> Result<HashMap<JobState, usize>, StorageError> {
200        let jobs = self.jobs.read().unwrap();
201        let mut counts = HashMap::new();
202
203        for job in jobs.values() {
204            let key = match &job.state {
205                JobState::Enqueued { .. } => JobState::enqueued(""),
206                JobState::Processing { .. } => JobState::processing("", ""),
207                JobState::Succeeded { .. } => JobState::succeeded(0, None),
208                JobState::Failed { .. } => JobState::failed("", None, 0),
209                JobState::Deleted { .. } => JobState::deleted(None),
210                JobState::Scheduled { .. } => JobState::scheduled(Utc::now(), ""),
211                JobState::AwaitingRetry { .. } => JobState::awaiting_retry(Utc::now(), 0, ""),
212            };
213            *counts.entry(key).or_insert(0) += 1;
214        }
215
216        Ok(counts)
217    }
218
219    async fn get_available_jobs(&self, limit: Option<usize>) -> Result<Vec<Job>, StorageError> {
220        let jobs = self.jobs.read().unwrap();
221        Ok(Self::get_available_jobs_internal(&jobs, limit))
222    }
223
224    async fn fetch_and_lock_job(
225        &self,
226        worker_id: &str,
227        queues: Option<&[String]>,
228    ) -> Result<Option<Job>, StorageError> {
229        // Clean up expired locks first
230        self.cleanup_expired_locks();
231
232        let mut jobs = self.jobs.write().unwrap();
233        let mut locks = self.locks.lock().unwrap();
234
235        // Find an available job that's not locked
236        let available_jobs = Self::get_available_jobs_internal(&jobs, None);
237
238        for mut job in available_jobs {
239            // Check if job matches queue filter
240            if let Some(queues) = queues {
241                if !queues.is_empty() && !queues.contains(&job.queue) {
242                    continue;
243                }
244            }
245
246            // Check if job is already locked
247            if locks.contains_key(&job.id) {
248                continue;
249            }
250
251            // Lock the job and mark as processing
252            let lock = JobLock {
253                worker_id: worker_id.to_string(),
254                expires_at: chrono::Utc::now() + chrono::Duration::minutes(30), // 30 minute default timeout
255            };
256            locks.insert(job.id.clone(), lock);
257
258            // Update job state
259            job.state = JobState::Processing {
260                worker_id: worker_id.to_string(),
261                started_at: chrono::Utc::now(),
262                server_name: "memory-storage".to_string(),
263            };
264
265            // Store updated job
266            jobs.insert(job.id.clone(), job.clone());
267
268            return Ok(Some(job));
269        }
270
271        Ok(None)
272    }
273
274    async fn try_acquire_job_lock(
275        &self,
276        job_id: &str,
277        worker_id: &str,
278        timeout_seconds: u64,
279    ) -> Result<bool, StorageError> {
280        self.cleanup_expired_locks();
281
282        let mut locks = self.locks.lock().unwrap();
283
284        // Check if job is already locked
285        if locks.contains_key(job_id) {
286            return Ok(false);
287        }
288
289        // Acquire the lock
290        let lock = JobLock {
291            worker_id: worker_id.to_string(),
292            expires_at: chrono::Utc::now() + chrono::Duration::seconds(timeout_seconds as i64),
293        };
294        locks.insert(job_id.to_string(), lock);
295
296        Ok(true)
297    }
298
299    async fn release_job_lock(&self, job_id: &str, worker_id: &str) -> Result<bool, StorageError> {
300        let mut locks = self.locks.lock().unwrap();
301
302        if let Some(lock) = locks.get(job_id) {
303            if lock.worker_id == worker_id {
304                locks.remove(job_id);
305                return Ok(true);
306            }
307        }
308
309        Ok(false)
310    }
311
312    async fn fetch_available_jobs_atomic(
313        &self,
314        worker_id: &str,
315        limit: Option<usize>,
316        queues: Option<&[String]>,
317    ) -> Result<Vec<Job>, StorageError> {
318        let mut jobs = Vec::new();
319        let fetch_limit = limit.unwrap_or(10).min(100); // Cap at 100 jobs
320
321        // Fetch jobs one by one to ensure proper locking
322        for _ in 0..fetch_limit {
323            match self.fetch_and_lock_job(worker_id, queues).await? {
324                Some(job) => jobs.push(job),
325                None => break, // No more available jobs
326            }
327        }
328
329        Ok(jobs)
330    }
331}
332
333impl MemoryStorage {
334    /// Clean up expired locks
335    fn cleanup_expired_locks(&self) {
336        let mut locks = self.locks.lock().unwrap();
337        let now = chrono::Utc::now();
338        locks.retain(|_, lock| lock.expires_at > now);
339    }
340}
341
342#[cfg(test)]
343mod tests {
344    use super::*;
345    use crate::core::Job;
346    use chrono::Duration;
347
348    fn create_test_job() -> Job {
349        Job::new("test_job", vec!["test_arg".to_string()])
350    }
351
352    #[tokio::test]
353    async fn test_memory_storage_basic_operations() {
354        let storage = MemoryStorage::new();
355        let job = create_test_job();
356
357        // Test enqueue
358        assert!(storage.enqueue(&job).await.is_ok());
359        assert_eq!(storage.len(), 1);
360
361        // Test get
362        let retrieved = storage.get(&job.id).await.unwrap();
363        assert!(retrieved.is_some());
364        assert_eq!(retrieved.unwrap().id, job.id);
365
366        // Test update
367        let mut updated_job = job.clone();
368        updated_job.state = JobState::processing("worker1", "server1");
369        assert!(storage.update(&updated_job).await.is_ok());
370
371        let retrieved = storage.get(&job.id).await.unwrap().unwrap();
372        assert!(matches!(retrieved.state, JobState::Processing { .. }));
373
374        // Test delete
375        let deleted = storage.delete(&job.id).await.unwrap();
376        assert!(deleted);
377        assert_eq!(storage.len(), 0);
378
379        // Test delete non-existent
380        let deleted = storage.delete(&job.id).await.unwrap();
381        assert!(!deleted);
382    }
383
384    #[tokio::test]
385    async fn test_memory_storage_list_operations() {
386        let storage = MemoryStorage::new();
387
388        // Create jobs with different states
389        let mut job1 = create_test_job();
390        job1.state = JobState::enqueued("default");
391
392        let mut job2 = create_test_job();
393        job2.state = JobState::processing("worker1", "server1");
394
395        let mut job3 = create_test_job();
396        job3.state = JobState::succeeded(100, None);
397
398        storage.enqueue(&job1).await.unwrap();
399        storage.enqueue(&job2).await.unwrap();
400        storage.enqueue(&job3).await.unwrap();
401
402        // Test list all
403        let all_jobs = storage.list(None, None, None).await.unwrap();
404        assert_eq!(all_jobs.len(), 3);
405
406        // Test list by state
407        let enqueued_state = JobState::enqueued("default");
408        let enqueued_jobs = storage
409            .list(Some(&enqueued_state), None, None)
410            .await
411            .unwrap();
412        assert_eq!(enqueued_jobs.len(), 1);
413        assert!(matches!(enqueued_jobs[0].state, JobState::Enqueued { .. }));
414
415        // Test list with limit
416        let limited_jobs = storage.list(None, Some(2), None).await.unwrap();
417        assert_eq!(limited_jobs.len(), 2);
418
419        // Test list with offset
420        let offset_jobs = storage.list(None, None, Some(1)).await.unwrap();
421        assert_eq!(offset_jobs.len(), 2);
422
423        // Test list with limit and offset
424        let paginated_jobs = storage.list(None, Some(1), Some(1)).await.unwrap();
425        assert_eq!(paginated_jobs.len(), 1);
426    }
427
428    #[tokio::test]
429    async fn test_memory_storage_job_counts() {
430        let storage = MemoryStorage::new();
431
432        let mut job1 = create_test_job();
433        job1.state = JobState::enqueued("default");
434
435        let mut job2 = create_test_job();
436        job2.state = JobState::enqueued("default");
437
438        let mut job3 = create_test_job();
439        job3.state = JobState::processing("worker1", "server1");
440
441        storage.enqueue(&job1).await.unwrap();
442        storage.enqueue(&job2).await.unwrap();
443        storage.enqueue(&job3).await.unwrap();
444
445        let counts = storage.get_job_counts().await.unwrap();
446
447        // Check that we have the right number of different state types
448        assert!(counts.len() >= 2);
449
450        // Since we're grouping by state type, we should have some enqueued and processing
451        let has_enqueued = counts
452            .keys()
453            .any(|k| matches!(k, JobState::Enqueued { .. }));
454        let has_processing = counts
455            .keys()
456            .any(|k| matches!(k, JobState::Processing { .. }));
457        assert!(has_enqueued);
458        assert!(has_processing);
459    }
460
461    #[tokio::test]
462    async fn test_memory_storage_available_jobs() {
463        let storage = MemoryStorage::new();
464
465        // Create jobs with different states and schedules
466        let mut job1 = create_test_job();
467        job1.state = JobState::enqueued("default");
468
469        let mut job2 = create_test_job();
470        job2.state = JobState::scheduled(Utc::now() - Duration::hours(1), "delay");
471
472        let mut job3 = create_test_job();
473        job3.state = JobState::scheduled(Utc::now() + Duration::hours(1), "delay");
474
475        let mut job4 = create_test_job();
476        job4.state = JobState::processing("worker1", "server1");
477
478        storage.enqueue(&job1).await.unwrap();
479        storage.enqueue(&job2).await.unwrap();
480        storage.enqueue(&job3).await.unwrap();
481        storage.enqueue(&job4).await.unwrap();
482
483        let available = storage.get_available_jobs(None).await.unwrap();
484        assert_eq!(available.len(), 2); // job1 (enqueued) and job2 (scheduled for past)
485
486        // Test with limit
487        let limited_available = storage.get_available_jobs(Some(1)).await.unwrap();
488        assert_eq!(limited_available.len(), 1);
489    }
490
491    #[tokio::test]
492    async fn test_memory_storage_capacity_limit() {
493        let config = MemoryConfig::new().with_max_jobs(2);
494        let storage = MemoryStorage::with_config(config);
495
496        let job1 = create_test_job();
497        let job2 = create_test_job();
498        let job3 = create_test_job();
499
500        assert!(storage.enqueue(&job1).await.is_ok());
501        assert!(storage.enqueue(&job2).await.is_ok());
502
503        // Third job should fail due to capacity
504        let result = storage.enqueue(&job3).await;
505        assert!(result.is_err());
506        assert!(matches!(
507            result.unwrap_err(),
508            StorageError::CapacityExceeded { .. }
509        ));
510    }
511
512    #[tokio::test]
513    async fn test_memory_storage_auto_cleanup() {
514        let config = MemoryConfig::new().with_max_jobs(3).with_auto_cleanup(true);
515        let storage = MemoryStorage::with_config(config);
516
517        let mut job1 = create_test_job();
518        job1.state = JobState::succeeded(100, None); // Will be cleaned up
519
520        let mut job2 = create_test_job();
521        job2.state = JobState::enqueued("default"); // Will remain
522
523        let job3 = create_test_job(); // New job
524
525        storage.enqueue(&job1).await.unwrap();
526        storage.enqueue(&job2).await.unwrap();
527
528        // This should trigger cleanup and succeed
529        assert!(storage.enqueue(&job3).await.is_ok());
530        assert_eq!(storage.len(), 2); // job1 should be cleaned up
531    }
532
533    #[tokio::test]
534    async fn test_memory_storage_update_nonexistent() {
535        let storage = MemoryStorage::new();
536        let job = create_test_job();
537
538        let result = storage.update(&job).await;
539        assert!(result.is_err());
540        assert!(matches!(
541            result.unwrap_err(),
542            StorageError::JobNotFound { .. }
543        ));
544    }
545}