Skip to main content

spider_core/state/
primitives.rs

1//! Thread-safe building blocks for custom spider state.
2//!
3//! These types are intentionally small and practical: counters, flags, and a
4//! few concurrent collections that are handy when parse tasks run in parallel.
5//!
6//! ## Example
7//!
8//! ```rust,ignore
9//! use spider_core::state::{Counter, VisitedUrls};
10//! use std::sync::Arc;
11//!
12//! #[derive(Clone, Default)]
13//! struct MySpiderState {
14//!     page_count: Counter,
15//!     visited_urls: VisitedUrls,
16//! }
17//!
18//! impl MySpiderState {
19//!     fn increment_page_count(&self) {
20//!         self.page_count.inc();
21//!     }
22//!
23//!     fn mark_url_visited(&self, url: String) {
24//!         self.visited_urls.mark(url);
25//!     }
26//! }
27//! ```
28
29use dashmap::DashMap;
30use parking_lot::RwLock;
31use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
32
33// ============================================================================
34// Counter - Thread-safe atomic counter
35// ============================================================================
36
37/// A thread-safe counter using atomic operations.
38///
39/// This is a wrapper around `AtomicUsize` that provides a more ergonomic API
40/// for counting operations in concurrent environments.
41///
42/// ## Example
43///
44/// ```rust,ignore
45/// use spider_core::state::Counter;
46///
47/// let counter = Counter::new();
48/// counter.inc();
49/// counter.add(5);
50/// assert_eq!(counter.get(), 6);
51/// ```
52#[derive(Debug, Default)]
53pub struct Counter(AtomicUsize);
54
55impl Counter {
56    /// Creates a new counter initialized to 0.
57    pub fn new() -> Self {
58        Self(AtomicUsize::new(0))
59    }
60
61    /// Creates a new counter with the specified initial value.
62    pub fn with_value(value: usize) -> Self {
63        Self(AtomicUsize::new(value))
64    }
65
66    /// Increments the counter by 1.
67    pub fn inc(&self) {
68        self.0.fetch_add(1, Ordering::AcqRel);
69    }
70
71    /// Decrements the counter by 1.
72    pub fn dec(&self) {
73        self.0.fetch_sub(1, Ordering::AcqRel);
74    }
75
76    /// Adds a value to the counter.
77    pub fn add(&self, value: usize) {
78        self.0.fetch_add(value, Ordering::AcqRel);
79    }
80
81    /// Subtracts a value from the counter.
82    pub fn sub(&self, value: usize) {
83        self.0.fetch_sub(value, Ordering::AcqRel);
84    }
85
86    /// Gets the current value of the counter.
87    pub fn get(&self) -> usize {
88        self.0.load(Ordering::Acquire)
89    }
90
91    /// Sets the counter to a specific value.
92    pub fn set(&self, value: usize) {
93        self.0.store(value, Ordering::Release);
94    }
95
96    /// Atomically swaps the counter value and returns the old value.
97    pub fn swap(&self, value: usize) -> usize {
98        self.0.swap(value, Ordering::AcqRel)
99    }
100
101    /// Atomically compares and swaps the value.
102    pub fn compare_and_swap(&self, current: usize, new: usize) -> usize {
103        self.0
104            .compare_exchange(current, new, Ordering::AcqRel, Ordering::Acquire)
105            .unwrap_or(current)
106    }
107}
108
109impl Clone for Counter {
110    fn clone(&self) -> Self {
111        Self::with_value(self.get())
112    }
113}
114
115// ============================================================================
116// Counter64 - 64-bit thread-safe counter
117// ============================================================================
118
119/// A 64-bit thread-safe counter for large counts.
120///
121/// Similar to [`Counter`] but uses AtomicU64 for larger values.
122///
123/// ## Example
124///
125/// ```rust,ignore
126/// use spider_core::state::Counter64;
127///
128/// let counter = Counter64::new();
129/// counter.add(1_000_000_000);
130/// assert_eq!(counter.get(), 1_000_000_000);
131/// ```
132#[derive(Debug, Default)]
133pub struct Counter64(AtomicU64);
134
135impl Counter64 {
136    /// Creates a new counter initialized to 0.
137    pub fn new() -> Self {
138        Self(AtomicU64::new(0))
139    }
140
141    /// Creates a new counter with the specified initial value.
142    pub fn with_value(value: u64) -> Self {
143        Self(AtomicU64::new(value))
144    }
145
146    /// Increments the counter by 1.
147    pub fn inc(&self) {
148        self.0.fetch_add(1, Ordering::AcqRel);
149    }
150
151    /// Decrements the counter by 1.
152    pub fn dec(&self) {
153        self.0.fetch_sub(1, Ordering::AcqRel);
154    }
155
156    /// Adds a value to the counter.
157    pub fn add(&self, value: u64) {
158        self.0.fetch_add(value, Ordering::AcqRel);
159    }
160
161    /// Subtracts a value from the counter.
162    pub fn sub(&self, value: u64) {
163        self.0.fetch_sub(value, Ordering::AcqRel);
164    }
165
166    /// Gets the current value of the counter.
167    pub fn get(&self) -> u64 {
168        self.0.load(Ordering::Acquire)
169    }
170
171    /// Sets the counter to a specific value.
172    pub fn set(&self, value: u64) {
173        self.0.store(value, Ordering::Release);
174    }
175}
176
177impl Clone for Counter64 {
178    fn clone(&self) -> Self {
179        Self::with_value(self.get())
180    }
181}
182
183// ============================================================================
184// Flag - Thread-safe boolean flag
185// ============================================================================
186
187/// A thread-safe boolean flag.
188///
189/// Useful for tracking state flags that need to be accessed from multiple threads.
190///
191/// ## Example
192///
193/// ```rust,ignore
194/// use spider_core::state::Flag;
195///
196/// let flag = Flag::new(false);
197/// flag.set(true);
198/// assert!(flag.get());
199/// ```
200#[derive(Debug, Default)]
201pub struct Flag(AtomicBool);
202
203impl Flag {
204    /// Creates a new flag with the specified initial value.
205    pub fn new(value: bool) -> Self {
206        Self(AtomicBool::new(value))
207    }
208
209    /// Gets the current value of the flag.
210    pub fn get(&self) -> bool {
211        self.0.load(Ordering::Acquire)
212    }
213
214    /// Sets the flag to the specified value.
215    pub fn set(&self, value: bool) {
216        self.0.store(value, Ordering::Release);
217    }
218
219    /// Atomically swaps the flag value and returns the old value.
220    pub fn swap(&self, value: bool) -> bool {
221        self.0.swap(value, Ordering::AcqRel)
222    }
223
224    /// Atomically compares and swaps the value.
225    pub fn compare_and_swap(&self, current: bool, new: bool) -> bool {
226        self.0
227            .compare_exchange(current, new, Ordering::AcqRel, Ordering::Acquire)
228            .unwrap_or(current)
229    }
230}
231
232impl Clone for Flag {
233    fn clone(&self) -> Self {
234        Self::new(self.get())
235    }
236}
237
238// ============================================================================
239// VisitedUrls - Thread-safe URL tracker
240// ============================================================================
241
242/// A thread-safe URL tracker using DashMap.
243///
244/// This provides efficient concurrent access for tracking visited URLs
245/// without requiring explicit locks.
246///
247/// ## Example
248///
249/// ```rust,ignore
250/// use spider_core::state::VisitedUrls;
251///
252/// let visited = VisitedUrls::new();
253/// visited.mark("https://example.com".to_string());
254/// assert!(visited.is_visited("https://example.com"));
255/// ```
256#[derive(Debug, Default)]
257pub struct VisitedUrls {
258    urls: DashMap<String, bool>,
259}
260
261impl VisitedUrls {
262    /// Creates a new empty URL tracker.
263    pub fn new() -> Self {
264        Self {
265            urls: DashMap::new(),
266        }
267    }
268
269    /// Creates a URL tracker with the specified capacity.
270    pub fn with_capacity(capacity: usize) -> Self {
271        Self {
272            urls: DashMap::with_capacity(capacity),
273        }
274    }
275
276    /// Marks a URL as visited.
277    pub fn mark(&self, url: String) {
278        self.urls.insert(url, true);
279    }
280
281    /// Checks if a URL has been visited.
282    pub fn is_visited(&self, url: &str) -> bool {
283        self.urls.contains_key(url)
284    }
285
286    /// Removes a URL from the visited set.
287    pub fn remove(&self, url: &str) {
288        self.urls.remove(url);
289    }
290
291    /// Returns the number of visited URLs.
292    pub fn len(&self) -> usize {
293        self.urls.len()
294    }
295
296    /// Returns true if no URLs have been visited.
297    pub fn is_empty(&self) -> bool {
298        self.urls.is_empty()
299    }
300
301    /// Clears all visited URLs.
302    pub fn clear(&self) {
303        self.urls.clear();
304    }
305}
306
307impl Clone for VisitedUrls {
308    fn clone(&self) -> Self {
309        Self {
310            urls: self.urls.clone(),
311        }
312    }
313}
314
315// ============================================================================
316// ConcurrentMap - Thread-safe key-value map
317// ============================================================================
318
319/// A thread-safe key-value map using DashMap.
320///
321/// Provides concurrent read/write access without explicit locking.
322///
323/// ## Example
324///
325/// ```rust,ignore
326/// use spider_core::state::ConcurrentMap;
327///
328/// let map = ConcurrentMap::new();
329/// map.insert("key".to_string(), 42);
330/// assert_eq!(map.get(&"key".to_string()), Some(42));
331/// ```
332pub struct ConcurrentMap<K, V> {
333    map: DashMap<K, V>,
334}
335
336impl<K, V> Default for ConcurrentMap<K, V>
337where
338    K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
339    V: Clone + Send + Sync,
340{
341    fn default() -> Self {
342        Self::new()
343    }
344}
345
346impl<K, V> std::fmt::Debug for ConcurrentMap<K, V>
347where
348    K: Eq + std::hash::Hash + Clone + Send + Sync + 'static + std::fmt::Debug,
349    V: Clone + Send + Sync + std::fmt::Debug,
350{
351    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
352        f.debug_struct("ConcurrentMap")
353            .field("count", &self.map.len())
354            .finish()
355    }
356}
357
358impl<K, V> ConcurrentMap<K, V>
359where
360    K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
361    V: Clone + Send + Sync,
362{
363    /// Creates a new empty map.
364    pub fn new() -> Self {
365        Self {
366            map: DashMap::new(),
367        }
368    }
369
370    /// Creates a map with the specified capacity.
371    pub fn with_capacity(capacity: usize) -> Self {
372        Self {
373            map: DashMap::with_capacity(capacity),
374        }
375    }
376
377    /// Inserts a key-value pair.
378    pub fn insert(&self, key: K, value: V) -> Option<V> {
379        self.map.insert(key, value)
380    }
381
382    /// Gets a reference to a value.
383    pub fn get(&self, key: &K) -> Option<V> {
384        self.map.get(key).map(|ref_multi| ref_multi.clone())
385    }
386
387    /// Removes a key from the map.
388    pub fn remove(&self, key: &K) -> Option<V> {
389        self.map.remove(key).map(|(_, v)| v)
390    }
391
392    /// Returns the number of entries in the map.
393    pub fn len(&self) -> usize {
394        self.map.len()
395    }
396
397    /// Returns true if the map is empty.
398    pub fn is_empty(&self) -> bool {
399        self.map.is_empty()
400    }
401
402    /// Clears all entries.
403    pub fn clear(&self) {
404        self.map.clear();
405    }
406
407    /// Returns true if the map contains the key.
408    pub fn contains_key(&self, key: &K) -> bool {
409        self.map.contains_key(key)
410    }
411}
412
413impl<K, V> Clone for ConcurrentMap<K, V>
414where
415    K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
416    V: Clone + Send + Sync,
417{
418    fn clone(&self) -> Self {
419        Self {
420            map: self.map.clone(),
421        }
422    }
423}
424
425// ============================================================================
426// ConcurrentVec - Thread-safe vector
427// ============================================================================
428
429/// A thread-safe vector using RwLock.
430///
431/// Provides concurrent read access with exclusive write access.
432///
433/// ## Example
434///
435/// ```rust,ignore
436/// use spider_core::state::ConcurrentVec;
437///
438/// let vec = ConcurrentVec::new();
439/// vec.push(1);
440/// vec.push(2);
441/// assert_eq!(vec.len(), 2);
442/// ```
443#[derive(Debug, Default)]
444pub struct ConcurrentVec<T> {
445    vec: RwLock<Vec<T>>,
446}
447
448impl<T> ConcurrentVec<T>
449where
450    T: Clone + Send + Sync + 'static,
451{
452    /// Creates a new empty vector.
453    pub fn new() -> Self {
454        Self {
455            vec: RwLock::new(Vec::new()),
456        }
457    }
458
459    /// Creates a vector with the specified capacity.
460    pub fn with_capacity(capacity: usize) -> Self {
461        Self {
462            vec: RwLock::new(Vec::with_capacity(capacity)),
463        }
464    }
465
466    /// Pushes an element to the vector.
467    pub fn push(&self, value: T) {
468        self.vec.write().push(value);
469    }
470
471    /// Returns the number of elements.
472    pub fn len(&self) -> usize {
473        self.vec.read().len()
474    }
475
476    /// Returns true if the vector is empty.
477    pub fn is_empty(&self) -> bool {
478        self.vec.read().is_empty()
479    }
480
481    /// Clears all elements.
482    pub fn clear(&self) {
483        self.vec.write().clear();
484    }
485
486    /// Returns a copy of all elements.
487    pub fn to_vec(&self) -> Vec<T> {
488        self.vec.read().clone()
489    }
490}
491
492impl<T> Clone for ConcurrentVec<T>
493where
494    T: Clone + Send + Sync + 'static,
495{
496    fn clone(&self) -> Self {
497        Self {
498            vec: RwLock::new(self.vec.read().clone()),
499        }
500    }
501}
502
503// ============================================================================
504// StateAccessMetrics - Metrics for tracking state access patterns
505// ============================================================================
506
507/// Metrics for tracking state access patterns.
508///
509/// Useful for debugging and performance monitoring of state access.
510///
511/// ## Example
512///
513/// ```rust,ignore
514/// use spider_core::state::StateAccessMetrics;
515///
516/// let metrics = StateAccessMetrics::new();
517/// metrics.record_read();
518/// metrics.record_write();
519/// println!("Reads: {}, Writes: {}", metrics.read_count(), metrics.write_count());
520/// ```
521#[derive(Debug, Default)]
522pub struct StateAccessMetrics {
523    read_count: AtomicUsize,
524    write_count: AtomicUsize,
525    concurrent_access_peak: AtomicUsize,
526    current_concurrent: AtomicUsize,
527}
528
529impl StateAccessMetrics {
530    /// Creates a new metrics tracker.
531    pub fn new() -> Self {
532        Self::default()
533    }
534
535    /// Records a read access.
536    pub fn record_read(&self) {
537        self.read_count.fetch_add(1, Ordering::AcqRel);
538    }
539
540    /// Records a write access.
541    pub fn record_write(&self) {
542        self.write_count.fetch_add(1, Ordering::AcqRel);
543    }
544
545    /// Records the start of an access (read or write).
546    pub fn record_access_start(&self) {
547        let current = self.current_concurrent.fetch_add(1, Ordering::AcqRel);
548        let peak = self.concurrent_access_peak.load(Ordering::Acquire);
549        if current + 1 > peak {
550            self.concurrent_access_peak
551                .compare_exchange(peak, current + 1, Ordering::AcqRel, Ordering::Acquire)
552                .ok();
553        }
554    }
555
556    /// Records the end of an access.
557    pub fn record_access_end(&self) {
558        self.current_concurrent.fetch_sub(1, Ordering::AcqRel);
559    }
560
561    /// Returns the total number of read accesses.
562    pub fn read_count(&self) -> usize {
563        self.read_count.load(Ordering::Acquire)
564    }
565
566    /// Returns the total number of write accesses.
567    pub fn write_count(&self) -> usize {
568        self.write_count.load(Ordering::Acquire)
569    }
570
571    /// Returns the peak concurrent access count.
572    pub fn concurrent_access_peak(&self) -> usize {
573        self.concurrent_access_peak.load(Ordering::Acquire)
574    }
575
576    /// Returns the current concurrent access count.
577    pub fn current_concurrent(&self) -> usize {
578        self.current_concurrent.load(Ordering::Acquire)
579    }
580
581    /// Resets all counters.
582    pub fn reset(&self) {
583        self.read_count.store(0, Ordering::Release);
584        self.write_count.store(0, Ordering::Release);
585        self.concurrent_access_peak.store(0, Ordering::Release);
586        self.current_concurrent.store(0, Ordering::Release);
587    }
588}
589
590impl Clone for StateAccessMetrics {
591    fn clone(&self) -> Self {
592        Self {
593            read_count: AtomicUsize::new(self.read_count.load(Ordering::Acquire)),
594            write_count: AtomicUsize::new(self.write_count.load(Ordering::Acquire)),
595            concurrent_access_peak: AtomicUsize::new(
596                self.concurrent_access_peak.load(Ordering::Acquire),
597            ),
598            current_concurrent: AtomicUsize::new(self.current_concurrent.load(Ordering::Acquire)),
599        }
600    }
601}