Skip to main content

spider_core/state/
primitives.rs

1//! # State Primitives Module
2//!
3//! Provides ready-to-use thread-safe primitives for building Spider state structures.
4//!
5//! ## Overview
6//!
7//! This module offers a collection of thread-safe types that can be used to build
8//! custom Spider state without worrying about concurrency issues. All types are
9//! designed for high-performance concurrent access with minimal locking overhead.
10//!
11//! ## Key Types
12//!
13//! - [`Counter`]: Thread-safe atomic counter
14//! - [`Counter64`]: 64-bit thread-safe counter for large counts
15//! - [`Flag`]: Thread-safe boolean flag
16//! - [`VisitedUrls`]: Thread-safe URL tracking with DashMap
17//! - [`ConcurrentMap<K, V>`]: Thread-safe key-value map
18//! - [`ConcurrentVec<T>`]: Thread-safe dynamic vector
19//! - [`StateAccessMetrics`]: Metrics for tracking state access patterns
20//!
21//! ## Example
22//!
23//! ```rust
24//! use spider_core::state::{Counter, VisitedUrls};
25//! use std::sync::Arc;
26//!
27//! #[derive(Clone, Default)]
28//! struct MySpiderState {
29//!     page_count: Counter,
30//!     visited_urls: VisitedUrls,
31//! }
32//!
33//! impl MySpiderState {
34//!     fn increment_page_count(&self) {
35//!         self.page_count.inc();
36//!     }
37//!
38//!     fn mark_url_visited(&self, url: String) {
39//!         self.visited_urls.mark(url);
40//!     }
41//! }
42//! ```
43
44use dashmap::DashMap;
45use parking_lot::RwLock;
46use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
47
48// ============================================================================
49// Counter - Thread-safe atomic counter
50// ============================================================================
51
52/// A thread-safe counter using atomic operations.
53///
54/// This is a wrapper around `AtomicUsize` that provides a more ergonomic API
55/// for counting operations in concurrent environments.
56///
57/// ## Example
58///
59/// ```rust
60/// use spider_core::state::Counter;
61///
62/// let counter = Counter::new();
63/// counter.inc();
64/// counter.add(5);
65/// assert_eq!(counter.get(), 6);
66/// ```
67#[derive(Debug, Default)]
68pub struct Counter(AtomicUsize);
69
70impl Counter {
71    /// Creates a new counter initialized to 0.
72    pub fn new() -> Self {
73        Self(AtomicUsize::new(0))
74    }
75
76    /// Creates a new counter with the specified initial value.
77    pub fn with_value(value: usize) -> Self {
78        Self(AtomicUsize::new(value))
79    }
80
81    /// Increments the counter by 1.
82    pub fn inc(&self) {
83        self.0.fetch_add(1, Ordering::AcqRel);
84    }
85
86    /// Decrements the counter by 1.
87    pub fn dec(&self) {
88        self.0.fetch_sub(1, Ordering::AcqRel);
89    }
90
91    /// Adds a value to the counter.
92    pub fn add(&self, value: usize) {
93        self.0.fetch_add(value, Ordering::AcqRel);
94    }
95
96    /// Subtracts a value from the counter.
97    pub fn sub(&self, value: usize) {
98        self.0.fetch_sub(value, Ordering::AcqRel);
99    }
100
101    /// Gets the current value of the counter.
102    pub fn get(&self) -> usize {
103        self.0.load(Ordering::Acquire)
104    }
105
106    /// Sets the counter to a specific value.
107    pub fn set(&self, value: usize) {
108        self.0.store(value, Ordering::Release);
109    }
110
111    /// Atomically swaps the counter value and returns the old value.
112    pub fn swap(&self, value: usize) -> usize {
113        self.0.swap(value, Ordering::AcqRel)
114    }
115
116    /// Atomically compares and swaps the value.
117    pub fn compare_and_swap(&self, current: usize, new: usize) -> usize {
118        self.0
119            .compare_exchange(current, new, Ordering::AcqRel, Ordering::Acquire)
120            .unwrap_or(current)
121    }
122}
123
124impl Clone for Counter {
125    fn clone(&self) -> Self {
126        Self::with_value(self.get())
127    }
128}
129
130// ============================================================================
131// Counter64 - 64-bit thread-safe counter
132// ============================================================================
133
134/// A 64-bit thread-safe counter for large counts.
135///
136/// Similar to [`Counter`] but uses AtomicU64 for larger values.
137///
138/// ## Example
139///
140/// ```rust
141/// use spider_core::state::Counter64;
142///
143/// let counter = Counter64::new();
144/// counter.add(1_000_000_000);
145/// assert_eq!(counter.get(), 1_000_000_000);
146/// ```
147#[derive(Debug, Default)]
148pub struct Counter64(AtomicU64);
149
150impl Counter64 {
151    /// Creates a new counter initialized to 0.
152    pub fn new() -> Self {
153        Self(AtomicU64::new(0))
154    }
155
156    /// Creates a new counter with the specified initial value.
157    pub fn with_value(value: u64) -> Self {
158        Self(AtomicU64::new(value))
159    }
160
161    /// Increments the counter by 1.
162    pub fn inc(&self) {
163        self.0.fetch_add(1, Ordering::AcqRel);
164    }
165
166    /// Decrements the counter by 1.
167    pub fn dec(&self) {
168        self.0.fetch_sub(1, Ordering::AcqRel);
169    }
170
171    /// Adds a value to the counter.
172    pub fn add(&self, value: u64) {
173        self.0.fetch_add(value, Ordering::AcqRel);
174    }
175
176    /// Subtracts a value from the counter.
177    pub fn sub(&self, value: u64) {
178        self.0.fetch_sub(value, Ordering::AcqRel);
179    }
180
181    /// Gets the current value of the counter.
182    pub fn get(&self) -> u64 {
183        self.0.load(Ordering::Acquire)
184    }
185
186    /// Sets the counter to a specific value.
187    pub fn set(&self, value: u64) {
188        self.0.store(value, Ordering::Release);
189    }
190}
191
192impl Clone for Counter64 {
193    fn clone(&self) -> Self {
194        Self::with_value(self.get())
195    }
196}
197
198// ============================================================================
199// Flag - Thread-safe boolean flag
200// ============================================================================
201
202/// A thread-safe boolean flag.
203///
204/// Useful for tracking state flags that need to be accessed from multiple threads.
205///
206/// ## Example
207///
208/// ```rust
209/// use spider_core::state::Flag;
210///
211/// let flag = Flag::new(false);
212/// flag.set(true);
213/// assert!(flag.get());
214/// ```
215#[derive(Debug, Default)]
216pub struct Flag(AtomicBool);
217
218impl Flag {
219    /// Creates a new flag with the specified initial value.
220    pub fn new(value: bool) -> Self {
221        Self(AtomicBool::new(value))
222    }
223
224    /// Gets the current value of the flag.
225    pub fn get(&self) -> bool {
226        self.0.load(Ordering::Acquire)
227    }
228
229    /// Sets the flag to the specified value.
230    pub fn set(&self, value: bool) {
231        self.0.store(value, Ordering::Release);
232    }
233
234    /// Atomically swaps the flag value and returns the old value.
235    pub fn swap(&self, value: bool) -> bool {
236        self.0.swap(value, Ordering::AcqRel)
237    }
238
239    /// Atomically compares and swaps the value.
240    pub fn compare_and_swap(&self, current: bool, new: bool) -> bool {
241        self.0
242            .compare_exchange(current, new, Ordering::AcqRel, Ordering::Acquire)
243            .unwrap_or(current)
244    }
245}
246
247impl Clone for Flag {
248    fn clone(&self) -> Self {
249        Self::new(self.get())
250    }
251}
252
253// ============================================================================
254// VisitedUrls - Thread-safe URL tracker
255// ============================================================================
256
257/// A thread-safe URL tracker using DashMap.
258///
259/// This provides efficient concurrent access for tracking visited URLs
260/// without requiring explicit locks.
261///
262/// ## Example
263///
264/// ```rust
265/// use spider_core::state::VisitedUrls;
266///
267/// let visited = VisitedUrls::new();
268/// visited.mark("https://example.com".to_string());
269/// assert!(visited.is_visited("https://example.com"));
270/// ```
271#[derive(Debug, Default)]
272pub struct VisitedUrls {
273    urls: DashMap<String, bool>,
274}
275
276impl VisitedUrls {
277    /// Creates a new empty URL tracker.
278    pub fn new() -> Self {
279        Self {
280            urls: DashMap::new(),
281        }
282    }
283
284    /// Creates a URL tracker with the specified capacity.
285    pub fn with_capacity(capacity: usize) -> Self {
286        Self {
287            urls: DashMap::with_capacity(capacity),
288        }
289    }
290
291    /// Marks a URL as visited.
292    pub fn mark(&self, url: String) {
293        self.urls.insert(url, true);
294    }
295
296    /// Checks if a URL has been visited.
297    pub fn is_visited(&self, url: &str) -> bool {
298        self.urls.contains_key(url)
299    }
300
301    /// Removes a URL from the visited set.
302    pub fn remove(&self, url: &str) {
303        self.urls.remove(url);
304    }
305
306    /// Returns the number of visited URLs.
307    pub fn len(&self) -> usize {
308        self.urls.len()
309    }
310
311    /// Returns true if no URLs have been visited.
312    pub fn is_empty(&self) -> bool {
313        self.urls.is_empty()
314    }
315
316    /// Clears all visited URLs.
317    pub fn clear(&self) {
318        self.urls.clear();
319    }
320}
321
322impl Clone for VisitedUrls {
323    fn clone(&self) -> Self {
324        Self {
325            urls: self.urls.clone(),
326        }
327    }
328}
329
330// ============================================================================
331// ConcurrentMap - Thread-safe key-value map
332// ============================================================================
333
334/// A thread-safe key-value map using DashMap.
335///
336/// Provides concurrent read/write access without explicit locking.
337///
338/// ## Example
339///
340/// ```rust
341/// use spider_core::state::ConcurrentMap;
342///
343/// let map = ConcurrentMap::new();
344/// map.insert("key".to_string(), 42);
345/// assert_eq!(map.get(&"key".to_string()), Some(42));
346/// ```
347pub struct ConcurrentMap<K, V> {
348    map: DashMap<K, V>,
349}
350
351impl<K, V> Default for ConcurrentMap<K, V>
352where
353    K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
354    V: Clone + Send + Sync,
355{
356    fn default() -> Self {
357        Self::new()
358    }
359}
360
361impl<K, V> std::fmt::Debug for ConcurrentMap<K, V>
362where
363    K: Eq + std::hash::Hash + Clone + Send + Sync + 'static + std::fmt::Debug,
364    V: Clone + Send + Sync + std::fmt::Debug,
365{
366    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
367        f.debug_struct("ConcurrentMap")
368            .field("count", &self.map.len())
369            .finish()
370    }
371}
372
373impl<K, V> ConcurrentMap<K, V>
374where
375    K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
376    V: Clone + Send + Sync,
377{
378    /// Creates a new empty map.
379    pub fn new() -> Self {
380        Self {
381            map: DashMap::new(),
382        }
383    }
384
385    /// Creates a map with the specified capacity.
386    pub fn with_capacity(capacity: usize) -> Self {
387        Self {
388            map: DashMap::with_capacity(capacity),
389        }
390    }
391
392    /// Inserts a key-value pair.
393    pub fn insert(&self, key: K, value: V) -> Option<V> {
394        self.map.insert(key, value)
395    }
396
397    /// Gets a reference to a value.
398    pub fn get(&self, key: &K) -> Option<V> {
399        self.map.get(key).map(|ref_multi| ref_multi.clone())
400    }
401
402    /// Removes a key from the map.
403    pub fn remove(&self, key: &K) -> Option<V> {
404        self.map.remove(key).map(|(_, v)| v)
405    }
406
407    /// Returns the number of entries in the map.
408    pub fn len(&self) -> usize {
409        self.map.len()
410    }
411
412    /// Returns true if the map is empty.
413    pub fn is_empty(&self) -> bool {
414        self.map.is_empty()
415    }
416
417    /// Clears all entries.
418    pub fn clear(&self) {
419        self.map.clear();
420    }
421
422    /// Returns true if the map contains the key.
423    pub fn contains_key(&self, key: &K) -> bool {
424        self.map.contains_key(key)
425    }
426}
427
428impl<K, V> Clone for ConcurrentMap<K, V>
429where
430    K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
431    V: Clone + Send + Sync,
432{
433    fn clone(&self) -> Self {
434        Self {
435            map: self.map.clone(),
436        }
437    }
438}
439
440// ============================================================================
441// ConcurrentVec - Thread-safe vector
442// ============================================================================
443
444/// A thread-safe vector using RwLock.
445///
446/// Provides concurrent read access with exclusive write access.
447///
448/// ## Example
449///
450/// ```rust
451/// use spider_core::state::ConcurrentVec;
452///
453/// let vec = ConcurrentVec::new();
454/// vec.push(1);
455/// vec.push(2);
456/// assert_eq!(vec.len(), 2);
457/// ```
458#[derive(Debug, Default)]
459pub struct ConcurrentVec<T> {
460    vec: RwLock<Vec<T>>,
461}
462
463impl<T> ConcurrentVec<T>
464where
465    T: Clone + Send + Sync + 'static,
466{
467    /// Creates a new empty vector.
468    pub fn new() -> Self {
469        Self {
470            vec: RwLock::new(Vec::new()),
471        }
472    }
473
474    /// Creates a vector with the specified capacity.
475    pub fn with_capacity(capacity: usize) -> Self {
476        Self {
477            vec: RwLock::new(Vec::with_capacity(capacity)),
478        }
479    }
480
481    /// Pushes an element to the vector.
482    pub fn push(&self, value: T) {
483        self.vec.write().push(value);
484    }
485
486    /// Returns the number of elements.
487    pub fn len(&self) -> usize {
488        self.vec.read().len()
489    }
490
491    /// Returns true if the vector is empty.
492    pub fn is_empty(&self) -> bool {
493        self.vec.read().is_empty()
494    }
495
496    /// Clears all elements.
497    pub fn clear(&self) {
498        self.vec.write().clear();
499    }
500
501    /// Returns a copy of all elements.
502    pub fn to_vec(&self) -> Vec<T> {
503        self.vec.read().clone()
504    }
505}
506
507impl<T> Clone for ConcurrentVec<T>
508where
509    T: Clone + Send + Sync + 'static,
510{
511    fn clone(&self) -> Self {
512        Self {
513            vec: RwLock::new(self.vec.read().clone()),
514        }
515    }
516}
517
518// ============================================================================
519// StateAccessMetrics - Metrics for tracking state access patterns
520// ============================================================================
521
522/// Metrics for tracking state access patterns.
523///
524/// Useful for debugging and performance monitoring of state access.
525///
526/// ## Example
527///
528/// ```rust
529/// use spider_core::state::StateAccessMetrics;
530///
531/// let metrics = StateAccessMetrics::new();
532/// metrics.record_read();
533/// metrics.record_write();
534/// println!("Reads: {}, Writes: {}", metrics.read_count(), metrics.write_count());
535/// ```
536#[derive(Debug, Default)]
537pub struct StateAccessMetrics {
538    read_count: AtomicUsize,
539    write_count: AtomicUsize,
540    concurrent_access_peak: AtomicUsize,
541    current_concurrent: AtomicUsize,
542}
543
544impl StateAccessMetrics {
545    /// Creates a new metrics tracker.
546    pub fn new() -> Self {
547        Self::default()
548    }
549
550    /// Records a read access.
551    pub fn record_read(&self) {
552        self.read_count.fetch_add(1, Ordering::AcqRel);
553    }
554
555    /// Records a write access.
556    pub fn record_write(&self) {
557        self.write_count.fetch_add(1, Ordering::AcqRel);
558    }
559
560    /// Records the start of an access (read or write).
561    pub fn record_access_start(&self) {
562        let current = self.current_concurrent.fetch_add(1, Ordering::AcqRel);
563        let peak = self.concurrent_access_peak.load(Ordering::Acquire);
564        if current + 1 > peak {
565            self.concurrent_access_peak
566                .compare_exchange(peak, current + 1, Ordering::AcqRel, Ordering::Acquire)
567                .ok();
568        }
569    }
570
571    /// Records the end of an access.
572    pub fn record_access_end(&self) {
573        self.current_concurrent.fetch_sub(1, Ordering::AcqRel);
574    }
575
576    /// Returns the total number of read accesses.
577    pub fn read_count(&self) -> usize {
578        self.read_count.load(Ordering::Acquire)
579    }
580
581    /// Returns the total number of write accesses.
582    pub fn write_count(&self) -> usize {
583        self.write_count.load(Ordering::Acquire)
584    }
585
586    /// Returns the peak concurrent access count.
587    pub fn concurrent_access_peak(&self) -> usize {
588        self.concurrent_access_peak.load(Ordering::Acquire)
589    }
590
591    /// Returns the current concurrent access count.
592    pub fn current_concurrent(&self) -> usize {
593        self.current_concurrent.load(Ordering::Acquire)
594    }
595
596    /// Resets all counters.
597    pub fn reset(&self) {
598        self.read_count.store(0, Ordering::Release);
599        self.write_count.store(0, Ordering::Release);
600        self.concurrent_access_peak.store(0, Ordering::Release);
601        self.current_concurrent.store(0, Ordering::Release);
602    }
603}
604
605impl Clone for StateAccessMetrics {
606    fn clone(&self) -> Self {
607        Self {
608            read_count: AtomicUsize::new(self.read_count.load(Ordering::Acquire)),
609            write_count: AtomicUsize::new(self.write_count.load(Ordering::Acquire)),
610            concurrent_access_peak: AtomicUsize::new(
611                self.concurrent_access_peak.load(Ordering::Acquire),
612            ),
613            current_concurrent: AtomicUsize::new(self.current_concurrent.load(Ordering::Acquire)),
614        }
615    }
616}