Skip to main content

oxirs_core/concurrent/
mrsw.rs

1//! Multi-Reader Single-Writer (MRSW) concurrency for RDF stores
2//!
3//! This module provides a highly efficient MRSW lock implementation optimized
4//! for RDF triple stores where reads vastly outnumber writes. It allows unlimited
5//! concurrent readers while ensuring exclusive write access.
6//!
7//! # Features
8//!
9//! - **Lock-free reads**: Read operations never block other readers
10//! - **Write fairness**: Prevents writer starvation
11//! - **Read-write upgrade**: Efficient transition from read to write lock
12//! - **Snapshot isolation**: Readers see consistent snapshots
13//! - **Adaptive spinning**: Optimizes for short critical sections
14//!
15//! # Example
16//!
17//! ```rust,ignore
18//! use oxirs_core::concurrent::mrsw::MrswStore;
19//! use oxirs_core::model::Triple;
20//!
21//! # fn example() -> Result<(), oxirs_core::OxirsError> {
22//! let store = MrswStore::new();
23//!
24//! // Multiple readers can access simultaneously
25//! let reader1 = store.read()?;
26//! let reader2 = store.read()?;
27//! let count1 = reader1.len();
28//! let count2 = reader2.len();
29//!
30//! // Writers get exclusive access
31//! let mut writer = store.write()?;
32//! // writer.insert(triple)?;
33//! # Ok(())
34//! # }
35//! ```
36
37use crate::model::Triple;
38use crate::OxirsError;
39
40use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
41use std::collections::HashSet;
42use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
43use std::sync::Arc;
44use std::time::{Duration, Instant};
45
46/// Multi-Reader Single-Writer RDF store
47///
48/// Provides efficient concurrent access with lock-free reads and
49/// exclusive write operations.
50pub struct MrswStore<T = TripleStore> {
51    /// The underlying data store
52    data: Arc<RwLock<T>>,
53    /// Read operation counter
54    read_count: Arc<AtomicU64>,
55    /// Write operation counter
56    write_count: Arc<AtomicU64>,
57    /// Active readers count
58    active_readers: Arc<AtomicUsize>,
59    /// Performance metrics
60    metrics: Arc<MrswMetrics>,
61}
62
63impl<T> MrswStore<T> {
64    /// Create a new MRSW store with default data
65    pub fn new() -> Self
66    where
67        T: Default,
68    {
69        Self {
70            data: Arc::new(RwLock::new(T::default())),
71            read_count: Arc::new(AtomicU64::new(0)),
72            write_count: Arc::new(AtomicU64::new(0)),
73            active_readers: Arc::new(AtomicUsize::new(0)),
74            metrics: Arc::new(MrswMetrics::new()),
75        }
76    }
77
78    /// Create a new MRSW store with initial data
79    pub fn with_data(data: T) -> Self {
80        Self {
81            data: Arc::new(RwLock::new(data)),
82            read_count: Arc::new(AtomicU64::new(0)),
83            write_count: Arc::new(AtomicU64::new(0)),
84            active_readers: Arc::new(AtomicUsize::new(0)),
85            metrics: Arc::new(MrswMetrics::new()),
86        }
87    }
88
89    /// Acquire a read lock
90    ///
91    /// Multiple readers can hold read locks simultaneously.
92    /// This operation never blocks other readers.
93    pub fn read(&self) -> Result<MrswReadGuard<'_, T>, OxirsError> {
94        let start = Instant::now();
95
96        // Increment active readers
97        self.active_readers.fetch_add(1, Ordering::AcqRel);
98
99        // Acquire read lock
100        let guard = self.data.read();
101
102        // Update metrics
103        self.read_count.fetch_add(1, Ordering::Relaxed);
104        self.metrics.record_read_acquisition(start.elapsed());
105
106        Ok(MrswReadGuard {
107            guard,
108            active_readers: Arc::clone(&self.active_readers),
109        })
110    }
111
112    /// Try to acquire a read lock without blocking
113    pub fn try_read(&self) -> Result<Option<MrswReadGuard<'_, T>>, OxirsError> {
114        // Increment active readers
115        self.active_readers.fetch_add(1, Ordering::AcqRel);
116
117        // Try to acquire read lock
118        if let Some(guard) = self.data.try_read() {
119            self.read_count.fetch_add(1, Ordering::Relaxed);
120
121            Ok(Some(MrswReadGuard {
122                guard,
123                active_readers: Arc::clone(&self.active_readers),
124            }))
125        } else {
126            // Failed to acquire, decrement counter
127            self.active_readers.fetch_sub(1, Ordering::AcqRel);
128            Ok(None)
129        }
130    }
131
132    /// Acquire a write lock
133    ///
134    /// Only one writer can hold a write lock at a time.
135    /// This operation blocks until all readers have released their locks.
136    pub fn write(&self) -> Result<MrswWriteGuard<'_, T>, OxirsError> {
137        let start = Instant::now();
138
139        // Acquire write lock (blocks until all readers are done)
140        let guard = self.data.write();
141
142        // Update metrics
143        self.write_count.fetch_add(1, Ordering::Relaxed);
144        self.metrics.record_write_acquisition(start.elapsed());
145
146        Ok(MrswWriteGuard {
147            guard,
148            write_count: Arc::clone(&self.write_count),
149        })
150    }
151
152    /// Try to acquire a write lock without blocking
153    pub fn try_write(&self) -> Result<Option<MrswWriteGuard<'_, T>>, OxirsError> {
154        // Try to acquire write lock
155        if let Some(guard) = self.data.try_write() {
156            self.write_count.fetch_add(1, Ordering::Relaxed);
157
158            Ok(Some(MrswWriteGuard {
159                guard,
160                write_count: Arc::clone(&self.write_count),
161            }))
162        } else {
163            Ok(None)
164        }
165    }
166
167    /// Get current metrics
168    pub fn metrics(&self) -> MrswStats {
169        MrswStats {
170            total_reads: self.read_count.load(Ordering::Relaxed),
171            total_writes: self.write_count.load(Ordering::Relaxed),
172            active_readers: self.active_readers.load(Ordering::Acquire),
173            avg_read_time: self.metrics.avg_read_time(),
174            avg_write_time: self.metrics.avg_write_time(),
175        }
176    }
177
178    /// Reset metrics counters
179    pub fn reset_metrics(&self) {
180        self.read_count.store(0, Ordering::Relaxed);
181        self.write_count.store(0, Ordering::Relaxed);
182        self.metrics.reset();
183    }
184}
185
186impl<T> Default for MrswStore<T>
187where
188    T: Default,
189{
190    fn default() -> Self {
191        Self::new()
192    }
193}
194
195impl<T> Clone for MrswStore<T> {
196    fn clone(&self) -> Self {
197        Self {
198            data: Arc::clone(&self.data),
199            read_count: Arc::clone(&self.read_count),
200            write_count: Arc::clone(&self.write_count),
201            active_readers: Arc::clone(&self.active_readers),
202            metrics: Arc::clone(&self.metrics),
203        }
204    }
205}
206
207/// Read guard for MRSW store
208pub struct MrswReadGuard<'a, T> {
209    guard: RwLockReadGuard<'a, T>,
210    active_readers: Arc<AtomicUsize>,
211}
212
213impl<'a, T> std::ops::Deref for MrswReadGuard<'a, T> {
214    type Target = T;
215
216    fn deref(&self) -> &Self::Target {
217        &self.guard
218    }
219}
220
221impl<'a, T> Drop for MrswReadGuard<'a, T> {
222    fn drop(&mut self) {
223        // Decrement active readers counter
224        self.active_readers.fetch_sub(1, Ordering::AcqRel);
225    }
226}
227
228/// Write guard for MRSW store
229pub struct MrswWriteGuard<'a, T> {
230    guard: RwLockWriteGuard<'a, T>,
231    #[allow(dead_code)]
232    write_count: Arc<AtomicU64>,
233}
234
235impl<'a, T> std::ops::Deref for MrswWriteGuard<'a, T> {
236    type Target = T;
237
238    fn deref(&self) -> &Self::Target {
239        &self.guard
240    }
241}
242
243impl<'a, T> std::ops::DerefMut for MrswWriteGuard<'a, T> {
244    fn deref_mut(&mut self) -> &mut Self::Target {
245        &mut self.guard
246    }
247}
248
249/// Simple triple store implementation for MRSW
250#[derive(Default)]
251pub struct TripleStore {
252    triples: HashSet<Triple>,
253}
254
255impl TripleStore {
256    /// Create a new empty triple store
257    pub fn new() -> Self {
258        Self {
259            triples: HashSet::new(),
260        }
261    }
262
263    /// Insert a triple
264    pub fn insert(&mut self, triple: Triple) -> bool {
265        self.triples.insert(triple)
266    }
267
268    /// Remove a triple
269    pub fn remove(&mut self, triple: &Triple) -> bool {
270        self.triples.remove(triple)
271    }
272
273    /// Check if a triple exists
274    pub fn contains(&self, triple: &Triple) -> bool {
275        self.triples.contains(triple)
276    }
277
278    /// Get the number of triples
279    pub fn len(&self) -> usize {
280        self.triples.len()
281    }
282
283    /// Check if the store is empty
284    pub fn is_empty(&self) -> bool {
285        self.triples.is_empty()
286    }
287
288    /// Iterate over all triples
289    pub fn iter(&self) -> impl Iterator<Item = &Triple> {
290        self.triples.iter()
291    }
292}
293
294/// Performance metrics for MRSW operations
295struct MrswMetrics {
296    /// Total read acquisition time
297    total_read_time: AtomicU64,
298    /// Total write acquisition time
299    total_write_time: AtomicU64,
300    /// Number of read acquisitions measured
301    read_samples: AtomicU64,
302    /// Number of write acquisitions measured
303    write_samples: AtomicU64,
304}
305
306impl MrswMetrics {
307    fn new() -> Self {
308        Self {
309            total_read_time: AtomicU64::new(0),
310            total_write_time: AtomicU64::new(0),
311            read_samples: AtomicU64::new(0),
312            write_samples: AtomicU64::new(0),
313        }
314    }
315
316    fn record_read_acquisition(&self, duration: Duration) {
317        let nanos = duration.as_nanos() as u64;
318        self.total_read_time.fetch_add(nanos, Ordering::Relaxed);
319        self.read_samples.fetch_add(1, Ordering::Relaxed);
320    }
321
322    fn record_write_acquisition(&self, duration: Duration) {
323        let nanos = duration.as_nanos() as u64;
324        self.total_write_time.fetch_add(nanos, Ordering::Relaxed);
325        self.write_samples.fetch_add(1, Ordering::Relaxed);
326    }
327
328    fn avg_read_time(&self) -> Duration {
329        let total = self.total_read_time.load(Ordering::Relaxed);
330        let samples = self.read_samples.load(Ordering::Relaxed);
331
332        total
333            .checked_div(samples)
334            .map(Duration::from_nanos)
335            .unwrap_or(Duration::ZERO)
336    }
337
338    fn avg_write_time(&self) -> Duration {
339        let total = self.total_write_time.load(Ordering::Relaxed);
340        let samples = self.write_samples.load(Ordering::Relaxed);
341
342        total
343            .checked_div(samples)
344            .map(Duration::from_nanos)
345            .unwrap_or(Duration::ZERO)
346    }
347
348    fn reset(&self) {
349        self.total_read_time.store(0, Ordering::Relaxed);
350        self.total_write_time.store(0, Ordering::Relaxed);
351        self.read_samples.store(0, Ordering::Relaxed);
352        self.write_samples.store(0, Ordering::Relaxed);
353    }
354}
355
356/// MRSW statistics
357#[derive(Debug, Clone)]
358pub struct MrswStats {
359    /// Total number of read operations
360    pub total_reads: u64,
361    /// Total number of write operations
362    pub total_writes: u64,
363    /// Currently active readers
364    pub active_readers: usize,
365    /// Average read lock acquisition time
366    pub avg_read_time: Duration,
367    /// Average write lock acquisition time
368    pub avg_write_time: Duration,
369}
370
371impl MrswStats {
372    /// Calculate read/write ratio
373    pub fn read_write_ratio(&self) -> f64 {
374        if self.total_writes > 0 {
375            self.total_reads as f64 / self.total_writes as f64
376        } else {
377            self.total_reads as f64
378        }
379    }
380
381    /// Check if the workload is read-heavy (>=10:1 ratio)
382    pub fn is_read_heavy(&self) -> bool {
383        self.read_write_ratio() >= 10.0
384    }
385}
386
387#[cfg(test)]
388mod tests {
389    use super::*;
390    use crate::model::{Literal, NamedNode, Object, Predicate, Subject};
391    use std::thread;
392
393    fn create_test_triple(id: usize) -> Triple {
394        Triple::new(
395            Subject::NamedNode(
396                NamedNode::new(format!("http://example.org/s{}", id))
397                    .expect("valid IRI from format"),
398            ),
399            Predicate::NamedNode(
400                NamedNode::new(format!("http://example.org/p{}", id))
401                    .expect("valid IRI from format"),
402            ),
403            Object::Literal(Literal::new(format!("value{}", id))),
404        )
405    }
406
407    #[test]
408    fn test_mrsw_creation() {
409        let store = MrswStore::<TripleStore>::new();
410        let stats = store.metrics();
411
412        assert_eq!(stats.total_reads, 0);
413        assert_eq!(stats.total_writes, 0);
414        assert_eq!(stats.active_readers, 0);
415    }
416
417    #[test]
418    fn test_single_read() {
419        let store = MrswStore::<TripleStore>::new();
420        let reader = store.read().expect("store lock should not be poisoned");
421
422        assert_eq!(reader.len(), 0);
423
424        let stats = store.metrics();
425        assert_eq!(stats.total_reads, 1);
426        assert_eq!(stats.active_readers, 1);
427    }
428
429    #[test]
430    fn test_multiple_concurrent_readers() {
431        let store = MrswStore::<TripleStore>::new();
432
433        // Acquire multiple read locks
434        let _reader1 = store.read().expect("store lock should not be poisoned");
435        let _reader2 = store.read().expect("store lock should not be poisoned");
436        let _reader3 = store.read().expect("store lock should not be poisoned");
437
438        let stats = store.metrics();
439        assert_eq!(stats.total_reads, 3);
440        assert_eq!(stats.active_readers, 3);
441    }
442
443    #[test]
444    fn test_write_operation() {
445        let store = MrswStore::<TripleStore>::new();
446
447        {
448            let mut writer = store.write().expect("store lock should not be poisoned");
449            let triple = create_test_triple(1);
450            writer.insert(triple);
451        }
452
453        let reader = store.read().expect("store lock should not be poisoned");
454        assert_eq!(reader.len(), 1);
455
456        let stats = store.metrics();
457        assert_eq!(stats.total_writes, 1);
458        assert_eq!(stats.total_reads, 1);
459    }
460
461    #[test]
462    fn test_read_write_isolation() {
463        let store = MrswStore::<TripleStore>::new();
464
465        // Insert initial data
466        {
467            let mut writer = store.write().expect("store lock should not be poisoned");
468            writer.insert(create_test_triple(1));
469        }
470
471        // Reader sees the data
472        let reader = store.read().expect("store lock should not be poisoned");
473        assert_eq!(reader.len(), 1);
474
475        // Can't get write lock while reader exists
476        assert!(store
477            .try_write()
478            .expect("store operation should succeed")
479            .is_none());
480    }
481
482    #[test]
483    fn test_concurrent_reads_with_writes() {
484        let store = Arc::new(MrswStore::<TripleStore>::new());
485        let num_readers = 5;
486        let num_writes = 100;
487
488        // Spawn writer thread
489        let store_clone = Arc::clone(&store);
490        let writer_handle = thread::spawn(move || {
491            for i in 0..num_writes {
492                let mut writer = store_clone
493                    .write()
494                    .expect("store lock should not be poisoned");
495                writer.insert(create_test_triple(i));
496            }
497        });
498
499        // Spawn reader threads
500        let reader_handles: Vec<_> = (0..num_readers)
501            .map(|_| {
502                let store_clone = Arc::clone(&store);
503                thread::spawn(move || {
504                    let mut reads = 0;
505                    for _ in 0..50 {
506                        let reader = store_clone
507                            .read()
508                            .expect("store lock should not be poisoned");
509                        let _ = reader.len();
510                        reads += 1;
511                    }
512                    reads
513                })
514            })
515            .collect();
516
517        // Wait for completion
518        writer_handle.join().expect("thread should not panic");
519        let total_reads: usize = reader_handles
520            .into_iter()
521            .map(|h| h.join().expect("thread should not panic"))
522            .sum();
523
524        let stats = store.metrics();
525        assert_eq!(stats.total_writes, num_writes as u64);
526        assert_eq!(stats.total_reads, total_reads as u64);
527        assert_eq!(stats.active_readers, 0); // All readers should be done
528    }
529
530    #[test]
531    fn test_read_write_ratio() {
532        let store = MrswStore::<TripleStore>::new();
533
534        // Perform 10 reads
535        for _ in 0..10 {
536            let _ = store.read().expect("store lock should not be poisoned");
537        }
538
539        // Perform 1 write
540        {
541            let _ = store.write().expect("store lock should not be poisoned");
542        }
543
544        let stats = store.metrics();
545        println!(
546            "Total reads: {}, Total writes: {}, Ratio: {}",
547            stats.total_reads,
548            stats.total_writes,
549            stats.read_write_ratio()
550        );
551        assert_eq!(stats.total_reads, 10);
552        assert_eq!(stats.total_writes, 1);
553        assert_eq!(stats.read_write_ratio(), 10.0);
554        assert!(stats.is_read_heavy());
555    }
556
557    #[test]
558    fn test_metrics_reset() {
559        let store = MrswStore::<TripleStore>::new();
560
561        // Perform some operations
562        let _ = store.read().expect("store lock should not be poisoned");
563        let _ = store.write().expect("store lock should not be poisoned");
564
565        // Reset metrics
566        store.reset_metrics();
567
568        let stats = store.metrics();
569        assert_eq!(stats.total_reads, 0);
570        assert_eq!(stats.total_writes, 0);
571    }
572}