1use crate::model::Triple;
38use crate::OxirsError;
39
40use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
41use std::collections::HashSet;
42use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
43use std::sync::Arc;
44use std::time::{Duration, Instant};
45
46pub struct MrswStore<T = TripleStore> {
51 data: Arc<RwLock<T>>,
53 read_count: Arc<AtomicU64>,
55 write_count: Arc<AtomicU64>,
57 active_readers: Arc<AtomicUsize>,
59 metrics: Arc<MrswMetrics>,
61}
62
63impl<T> MrswStore<T> {
64 pub fn new() -> Self
66 where
67 T: Default,
68 {
69 Self {
70 data: Arc::new(RwLock::new(T::default())),
71 read_count: Arc::new(AtomicU64::new(0)),
72 write_count: Arc::new(AtomicU64::new(0)),
73 active_readers: Arc::new(AtomicUsize::new(0)),
74 metrics: Arc::new(MrswMetrics::new()),
75 }
76 }
77
78 pub fn with_data(data: T) -> Self {
80 Self {
81 data: Arc::new(RwLock::new(data)),
82 read_count: Arc::new(AtomicU64::new(0)),
83 write_count: Arc::new(AtomicU64::new(0)),
84 active_readers: Arc::new(AtomicUsize::new(0)),
85 metrics: Arc::new(MrswMetrics::new()),
86 }
87 }
88
89 pub fn read(&self) -> Result<MrswReadGuard<'_, T>, OxirsError> {
94 let start = Instant::now();
95
96 self.active_readers.fetch_add(1, Ordering::AcqRel);
98
99 let guard = self.data.read();
101
102 self.read_count.fetch_add(1, Ordering::Relaxed);
104 self.metrics.record_read_acquisition(start.elapsed());
105
106 Ok(MrswReadGuard {
107 guard,
108 active_readers: Arc::clone(&self.active_readers),
109 })
110 }
111
112 pub fn try_read(&self) -> Result<Option<MrswReadGuard<'_, T>>, OxirsError> {
114 self.active_readers.fetch_add(1, Ordering::AcqRel);
116
117 if let Some(guard) = self.data.try_read() {
119 self.read_count.fetch_add(1, Ordering::Relaxed);
120
121 Ok(Some(MrswReadGuard {
122 guard,
123 active_readers: Arc::clone(&self.active_readers),
124 }))
125 } else {
126 self.active_readers.fetch_sub(1, Ordering::AcqRel);
128 Ok(None)
129 }
130 }
131
132 pub fn write(&self) -> Result<MrswWriteGuard<'_, T>, OxirsError> {
137 let start = Instant::now();
138
139 let guard = self.data.write();
141
142 self.write_count.fetch_add(1, Ordering::Relaxed);
144 self.metrics.record_write_acquisition(start.elapsed());
145
146 Ok(MrswWriteGuard {
147 guard,
148 write_count: Arc::clone(&self.write_count),
149 })
150 }
151
152 pub fn try_write(&self) -> Result<Option<MrswWriteGuard<'_, T>>, OxirsError> {
154 if let Some(guard) = self.data.try_write() {
156 self.write_count.fetch_add(1, Ordering::Relaxed);
157
158 Ok(Some(MrswWriteGuard {
159 guard,
160 write_count: Arc::clone(&self.write_count),
161 }))
162 } else {
163 Ok(None)
164 }
165 }
166
167 pub fn metrics(&self) -> MrswStats {
169 MrswStats {
170 total_reads: self.read_count.load(Ordering::Relaxed),
171 total_writes: self.write_count.load(Ordering::Relaxed),
172 active_readers: self.active_readers.load(Ordering::Acquire),
173 avg_read_time: self.metrics.avg_read_time(),
174 avg_write_time: self.metrics.avg_write_time(),
175 }
176 }
177
178 pub fn reset_metrics(&self) {
180 self.read_count.store(0, Ordering::Relaxed);
181 self.write_count.store(0, Ordering::Relaxed);
182 self.metrics.reset();
183 }
184}
185
186impl<T> Default for MrswStore<T>
187where
188 T: Default,
189{
190 fn default() -> Self {
191 Self::new()
192 }
193}
194
195impl<T> Clone for MrswStore<T> {
196 fn clone(&self) -> Self {
197 Self {
198 data: Arc::clone(&self.data),
199 read_count: Arc::clone(&self.read_count),
200 write_count: Arc::clone(&self.write_count),
201 active_readers: Arc::clone(&self.active_readers),
202 metrics: Arc::clone(&self.metrics),
203 }
204 }
205}
206
207pub struct MrswReadGuard<'a, T> {
209 guard: RwLockReadGuard<'a, T>,
210 active_readers: Arc<AtomicUsize>,
211}
212
213impl<'a, T> std::ops::Deref for MrswReadGuard<'a, T> {
214 type Target = T;
215
216 fn deref(&self) -> &Self::Target {
217 &self.guard
218 }
219}
220
221impl<'a, T> Drop for MrswReadGuard<'a, T> {
222 fn drop(&mut self) {
223 self.active_readers.fetch_sub(1, Ordering::AcqRel);
225 }
226}
227
228pub struct MrswWriteGuard<'a, T> {
230 guard: RwLockWriteGuard<'a, T>,
231 #[allow(dead_code)]
232 write_count: Arc<AtomicU64>,
233}
234
235impl<'a, T> std::ops::Deref for MrswWriteGuard<'a, T> {
236 type Target = T;
237
238 fn deref(&self) -> &Self::Target {
239 &self.guard
240 }
241}
242
243impl<'a, T> std::ops::DerefMut for MrswWriteGuard<'a, T> {
244 fn deref_mut(&mut self) -> &mut Self::Target {
245 &mut self.guard
246 }
247}
248
249#[derive(Default)]
251pub struct TripleStore {
252 triples: HashSet<Triple>,
253}
254
255impl TripleStore {
256 pub fn new() -> Self {
258 Self {
259 triples: HashSet::new(),
260 }
261 }
262
263 pub fn insert(&mut self, triple: Triple) -> bool {
265 self.triples.insert(triple)
266 }
267
268 pub fn remove(&mut self, triple: &Triple) -> bool {
270 self.triples.remove(triple)
271 }
272
273 pub fn contains(&self, triple: &Triple) -> bool {
275 self.triples.contains(triple)
276 }
277
278 pub fn len(&self) -> usize {
280 self.triples.len()
281 }
282
283 pub fn is_empty(&self) -> bool {
285 self.triples.is_empty()
286 }
287
288 pub fn iter(&self) -> impl Iterator<Item = &Triple> {
290 self.triples.iter()
291 }
292}
293
294struct MrswMetrics {
296 total_read_time: AtomicU64,
298 total_write_time: AtomicU64,
300 read_samples: AtomicU64,
302 write_samples: AtomicU64,
304}
305
306impl MrswMetrics {
307 fn new() -> Self {
308 Self {
309 total_read_time: AtomicU64::new(0),
310 total_write_time: AtomicU64::new(0),
311 read_samples: AtomicU64::new(0),
312 write_samples: AtomicU64::new(0),
313 }
314 }
315
316 fn record_read_acquisition(&self, duration: Duration) {
317 let nanos = duration.as_nanos() as u64;
318 self.total_read_time.fetch_add(nanos, Ordering::Relaxed);
319 self.read_samples.fetch_add(1, Ordering::Relaxed);
320 }
321
322 fn record_write_acquisition(&self, duration: Duration) {
323 let nanos = duration.as_nanos() as u64;
324 self.total_write_time.fetch_add(nanos, Ordering::Relaxed);
325 self.write_samples.fetch_add(1, Ordering::Relaxed);
326 }
327
328 fn avg_read_time(&self) -> Duration {
329 let total = self.total_read_time.load(Ordering::Relaxed);
330 let samples = self.read_samples.load(Ordering::Relaxed);
331
332 total
333 .checked_div(samples)
334 .map(Duration::from_nanos)
335 .unwrap_or(Duration::ZERO)
336 }
337
338 fn avg_write_time(&self) -> Duration {
339 let total = self.total_write_time.load(Ordering::Relaxed);
340 let samples = self.write_samples.load(Ordering::Relaxed);
341
342 total
343 .checked_div(samples)
344 .map(Duration::from_nanos)
345 .unwrap_or(Duration::ZERO)
346 }
347
348 fn reset(&self) {
349 self.total_read_time.store(0, Ordering::Relaxed);
350 self.total_write_time.store(0, Ordering::Relaxed);
351 self.read_samples.store(0, Ordering::Relaxed);
352 self.write_samples.store(0, Ordering::Relaxed);
353 }
354}
355
356#[derive(Debug, Clone)]
358pub struct MrswStats {
359 pub total_reads: u64,
361 pub total_writes: u64,
363 pub active_readers: usize,
365 pub avg_read_time: Duration,
367 pub avg_write_time: Duration,
369}
370
371impl MrswStats {
372 pub fn read_write_ratio(&self) -> f64 {
374 if self.total_writes > 0 {
375 self.total_reads as f64 / self.total_writes as f64
376 } else {
377 self.total_reads as f64
378 }
379 }
380
381 pub fn is_read_heavy(&self) -> bool {
383 self.read_write_ratio() >= 10.0
384 }
385}
386
387#[cfg(test)]
388mod tests {
389 use super::*;
390 use crate::model::{Literal, NamedNode, Object, Predicate, Subject};
391 use std::thread;
392
393 fn create_test_triple(id: usize) -> Triple {
394 Triple::new(
395 Subject::NamedNode(
396 NamedNode::new(format!("http://example.org/s{}", id))
397 .expect("valid IRI from format"),
398 ),
399 Predicate::NamedNode(
400 NamedNode::new(format!("http://example.org/p{}", id))
401 .expect("valid IRI from format"),
402 ),
403 Object::Literal(Literal::new(format!("value{}", id))),
404 )
405 }
406
407 #[test]
408 fn test_mrsw_creation() {
409 let store = MrswStore::<TripleStore>::new();
410 let stats = store.metrics();
411
412 assert_eq!(stats.total_reads, 0);
413 assert_eq!(stats.total_writes, 0);
414 assert_eq!(stats.active_readers, 0);
415 }
416
417 #[test]
418 fn test_single_read() {
419 let store = MrswStore::<TripleStore>::new();
420 let reader = store.read().expect("store lock should not be poisoned");
421
422 assert_eq!(reader.len(), 0);
423
424 let stats = store.metrics();
425 assert_eq!(stats.total_reads, 1);
426 assert_eq!(stats.active_readers, 1);
427 }
428
429 #[test]
430 fn test_multiple_concurrent_readers() {
431 let store = MrswStore::<TripleStore>::new();
432
433 let _reader1 = store.read().expect("store lock should not be poisoned");
435 let _reader2 = store.read().expect("store lock should not be poisoned");
436 let _reader3 = store.read().expect("store lock should not be poisoned");
437
438 let stats = store.metrics();
439 assert_eq!(stats.total_reads, 3);
440 assert_eq!(stats.active_readers, 3);
441 }
442
443 #[test]
444 fn test_write_operation() {
445 let store = MrswStore::<TripleStore>::new();
446
447 {
448 let mut writer = store.write().expect("store lock should not be poisoned");
449 let triple = create_test_triple(1);
450 writer.insert(triple);
451 }
452
453 let reader = store.read().expect("store lock should not be poisoned");
454 assert_eq!(reader.len(), 1);
455
456 let stats = store.metrics();
457 assert_eq!(stats.total_writes, 1);
458 assert_eq!(stats.total_reads, 1);
459 }
460
461 #[test]
462 fn test_read_write_isolation() {
463 let store = MrswStore::<TripleStore>::new();
464
465 {
467 let mut writer = store.write().expect("store lock should not be poisoned");
468 writer.insert(create_test_triple(1));
469 }
470
471 let reader = store.read().expect("store lock should not be poisoned");
473 assert_eq!(reader.len(), 1);
474
475 assert!(store
477 .try_write()
478 .expect("store operation should succeed")
479 .is_none());
480 }
481
482 #[test]
483 fn test_concurrent_reads_with_writes() {
484 let store = Arc::new(MrswStore::<TripleStore>::new());
485 let num_readers = 5;
486 let num_writes = 100;
487
488 let store_clone = Arc::clone(&store);
490 let writer_handle = thread::spawn(move || {
491 for i in 0..num_writes {
492 let mut writer = store_clone
493 .write()
494 .expect("store lock should not be poisoned");
495 writer.insert(create_test_triple(i));
496 }
497 });
498
499 let reader_handles: Vec<_> = (0..num_readers)
501 .map(|_| {
502 let store_clone = Arc::clone(&store);
503 thread::spawn(move || {
504 let mut reads = 0;
505 for _ in 0..50 {
506 let reader = store_clone
507 .read()
508 .expect("store lock should not be poisoned");
509 let _ = reader.len();
510 reads += 1;
511 }
512 reads
513 })
514 })
515 .collect();
516
517 writer_handle.join().expect("thread should not panic");
519 let total_reads: usize = reader_handles
520 .into_iter()
521 .map(|h| h.join().expect("thread should not panic"))
522 .sum();
523
524 let stats = store.metrics();
525 assert_eq!(stats.total_writes, num_writes as u64);
526 assert_eq!(stats.total_reads, total_reads as u64);
527 assert_eq!(stats.active_readers, 0); }
529
530 #[test]
531 fn test_read_write_ratio() {
532 let store = MrswStore::<TripleStore>::new();
533
534 for _ in 0..10 {
536 let _ = store.read().expect("store lock should not be poisoned");
537 }
538
539 {
541 let _ = store.write().expect("store lock should not be poisoned");
542 }
543
544 let stats = store.metrics();
545 println!(
546 "Total reads: {}, Total writes: {}, Ratio: {}",
547 stats.total_reads,
548 stats.total_writes,
549 stats.read_write_ratio()
550 );
551 assert_eq!(stats.total_reads, 10);
552 assert_eq!(stats.total_writes, 1);
553 assert_eq!(stats.read_write_ratio(), 10.0);
554 assert!(stats.is_read_heavy());
555 }
556
557 #[test]
558 fn test_metrics_reset() {
559 let store = MrswStore::<TripleStore>::new();
560
561 let _ = store.read().expect("store lock should not be poisoned");
563 let _ = store.write().expect("store lock should not be poisoned");
564
565 store.reset_metrics();
567
568 let stats = store.metrics();
569 assert_eq!(stats.total_reads, 0);
570 assert_eq!(stats.total_writes, 0);
571 }
572}