spider_core/state/primitives.rs
1//! Thread-safe building blocks for custom spider state.
2//!
3//! These types are intentionally small and practical: counters, flags, and a
4//! few concurrent collections that are handy when parse tasks run in parallel.
5//!
6//! ## Example
7//!
8//! ```rust,ignore
9//! use spider_core::state::{Counter, VisitedUrls};
10//! use std::sync::Arc;
11//!
12//! #[derive(Clone, Default)]
13//! struct MySpiderState {
14//! page_count: Counter,
15//! visited_urls: VisitedUrls,
16//! }
17//!
18//! impl MySpiderState {
19//! fn increment_page_count(&self) {
20//! self.page_count.inc();
21//! }
22//!
23//! fn mark_url_visited(&self, url: String) {
24//! self.visited_urls.mark(url);
25//! }
26//! }
27//! ```
28
29use dashmap::DashMap;
30use parking_lot::RwLock;
31use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
32
33// ============================================================================
34// Counter - Thread-safe atomic counter
35// ============================================================================
36
37/// A thread-safe counter using atomic operations.
38///
39/// This is a wrapper around `AtomicUsize` that provides a more ergonomic API
40/// for counting operations in concurrent environments.
41///
42/// ## Example
43///
44/// ```rust,ignore
45/// use spider_core::state::Counter;
46///
47/// let counter = Counter::new();
48/// counter.inc();
49/// counter.add(5);
50/// assert_eq!(counter.get(), 6);
51/// ```
52#[derive(Debug, Default)]
53pub struct Counter(AtomicUsize);
54
55impl Counter {
56 /// Creates a new counter initialized to 0.
57 pub fn new() -> Self {
58 Self(AtomicUsize::new(0))
59 }
60
61 /// Creates a new counter with the specified initial value.
62 pub fn with_value(value: usize) -> Self {
63 Self(AtomicUsize::new(value))
64 }
65
66 /// Increments the counter by 1.
67 pub fn inc(&self) {
68 self.0.fetch_add(1, Ordering::AcqRel);
69 }
70
71 /// Decrements the counter by 1.
72 pub fn dec(&self) {
73 self.0.fetch_sub(1, Ordering::AcqRel);
74 }
75
76 /// Adds a value to the counter.
77 pub fn add(&self, value: usize) {
78 self.0.fetch_add(value, Ordering::AcqRel);
79 }
80
81 /// Subtracts a value from the counter.
82 pub fn sub(&self, value: usize) {
83 self.0.fetch_sub(value, Ordering::AcqRel);
84 }
85
86 /// Gets the current value of the counter.
87 pub fn get(&self) -> usize {
88 self.0.load(Ordering::Acquire)
89 }
90
91 /// Sets the counter to a specific value.
92 pub fn set(&self, value: usize) {
93 self.0.store(value, Ordering::Release);
94 }
95
96 /// Atomically swaps the counter value and returns the old value.
97 pub fn swap(&self, value: usize) -> usize {
98 self.0.swap(value, Ordering::AcqRel)
99 }
100
101 /// Atomically compares and swaps the value.
102 pub fn compare_and_swap(&self, current: usize, new: usize) -> usize {
103 self.0
104 .compare_exchange(current, new, Ordering::AcqRel, Ordering::Acquire)
105 .unwrap_or(current)
106 }
107}
108
109impl Clone for Counter {
110 fn clone(&self) -> Self {
111 Self::with_value(self.get())
112 }
113}
114
115// ============================================================================
116// Counter64 - 64-bit thread-safe counter
117// ============================================================================
118
119/// A 64-bit thread-safe counter for large counts.
120///
121/// Similar to [`Counter`] but uses AtomicU64 for larger values.
122///
123/// ## Example
124///
125/// ```rust,ignore
126/// use spider_core::state::Counter64;
127///
128/// let counter = Counter64::new();
129/// counter.add(1_000_000_000);
130/// assert_eq!(counter.get(), 1_000_000_000);
131/// ```
132#[derive(Debug, Default)]
133pub struct Counter64(AtomicU64);
134
135impl Counter64 {
136 /// Creates a new counter initialized to 0.
137 pub fn new() -> Self {
138 Self(AtomicU64::new(0))
139 }
140
141 /// Creates a new counter with the specified initial value.
142 pub fn with_value(value: u64) -> Self {
143 Self(AtomicU64::new(value))
144 }
145
146 /// Increments the counter by 1.
147 pub fn inc(&self) {
148 self.0.fetch_add(1, Ordering::AcqRel);
149 }
150
151 /// Decrements the counter by 1.
152 pub fn dec(&self) {
153 self.0.fetch_sub(1, Ordering::AcqRel);
154 }
155
156 /// Adds a value to the counter.
157 pub fn add(&self, value: u64) {
158 self.0.fetch_add(value, Ordering::AcqRel);
159 }
160
161 /// Subtracts a value from the counter.
162 pub fn sub(&self, value: u64) {
163 self.0.fetch_sub(value, Ordering::AcqRel);
164 }
165
166 /// Gets the current value of the counter.
167 pub fn get(&self) -> u64 {
168 self.0.load(Ordering::Acquire)
169 }
170
171 /// Sets the counter to a specific value.
172 pub fn set(&self, value: u64) {
173 self.0.store(value, Ordering::Release);
174 }
175}
176
177impl Clone for Counter64 {
178 fn clone(&self) -> Self {
179 Self::with_value(self.get())
180 }
181}
182
183// ============================================================================
184// Flag - Thread-safe boolean flag
185// ============================================================================
186
187/// A thread-safe boolean flag.
188///
189/// Useful for tracking state flags that need to be accessed from multiple threads.
190///
191/// ## Example
192///
193/// ```rust,ignore
194/// use spider_core::state::Flag;
195///
196/// let flag = Flag::new(false);
197/// flag.set(true);
198/// assert!(flag.get());
199/// ```
200#[derive(Debug, Default)]
201pub struct Flag(AtomicBool);
202
203impl Flag {
204 /// Creates a new flag with the specified initial value.
205 pub fn new(value: bool) -> Self {
206 Self(AtomicBool::new(value))
207 }
208
209 /// Gets the current value of the flag.
210 pub fn get(&self) -> bool {
211 self.0.load(Ordering::Acquire)
212 }
213
214 /// Sets the flag to the specified value.
215 pub fn set(&self, value: bool) {
216 self.0.store(value, Ordering::Release);
217 }
218
219 /// Atomically swaps the flag value and returns the old value.
220 pub fn swap(&self, value: bool) -> bool {
221 self.0.swap(value, Ordering::AcqRel)
222 }
223
224 /// Atomically compares and swaps the value.
225 pub fn compare_and_swap(&self, current: bool, new: bool) -> bool {
226 self.0
227 .compare_exchange(current, new, Ordering::AcqRel, Ordering::Acquire)
228 .unwrap_or(current)
229 }
230}
231
232impl Clone for Flag {
233 fn clone(&self) -> Self {
234 Self::new(self.get())
235 }
236}
237
238// ============================================================================
239// VisitedUrls - Thread-safe URL tracker
240// ============================================================================
241
242/// A thread-safe URL tracker using DashMap.
243///
244/// This provides efficient concurrent access for tracking visited URLs
245/// without requiring explicit locks.
246///
247/// ## Example
248///
249/// ```rust,ignore
250/// use spider_core::state::VisitedUrls;
251///
252/// let visited = VisitedUrls::new();
253/// visited.mark("https://example.com".to_string());
254/// assert!(visited.is_visited("https://example.com"));
255/// ```
256#[derive(Debug, Default)]
257pub struct VisitedUrls {
258 urls: DashMap<String, bool>,
259}
260
261impl VisitedUrls {
262 /// Creates a new empty URL tracker.
263 pub fn new() -> Self {
264 Self {
265 urls: DashMap::new(),
266 }
267 }
268
269 /// Creates a URL tracker with the specified capacity.
270 pub fn with_capacity(capacity: usize) -> Self {
271 Self {
272 urls: DashMap::with_capacity(capacity),
273 }
274 }
275
276 /// Marks a URL as visited.
277 pub fn mark(&self, url: String) {
278 self.urls.insert(url, true);
279 }
280
281 /// Checks if a URL has been visited.
282 pub fn is_visited(&self, url: &str) -> bool {
283 self.urls.contains_key(url)
284 }
285
286 /// Removes a URL from the visited set.
287 pub fn remove(&self, url: &str) {
288 self.urls.remove(url);
289 }
290
291 /// Returns the number of visited URLs.
292 pub fn len(&self) -> usize {
293 self.urls.len()
294 }
295
296 /// Returns true if no URLs have been visited.
297 pub fn is_empty(&self) -> bool {
298 self.urls.is_empty()
299 }
300
301 /// Clears all visited URLs.
302 pub fn clear(&self) {
303 self.urls.clear();
304 }
305}
306
307impl Clone for VisitedUrls {
308 fn clone(&self) -> Self {
309 Self {
310 urls: self.urls.clone(),
311 }
312 }
313}
314
315// ============================================================================
316// ConcurrentMap - Thread-safe key-value map
317// ============================================================================
318
319/// A thread-safe key-value map using DashMap.
320///
321/// Provides concurrent read/write access without explicit locking.
322///
323/// ## Example
324///
325/// ```rust,ignore
326/// use spider_core::state::ConcurrentMap;
327///
328/// let map = ConcurrentMap::new();
329/// map.insert("key".to_string(), 42);
330/// assert_eq!(map.get(&"key".to_string()), Some(42));
331/// ```
332pub struct ConcurrentMap<K, V> {
333 map: DashMap<K, V>,
334}
335
336impl<K, V> Default for ConcurrentMap<K, V>
337where
338 K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
339 V: Clone + Send + Sync,
340{
341 fn default() -> Self {
342 Self::new()
343 }
344}
345
346impl<K, V> std::fmt::Debug for ConcurrentMap<K, V>
347where
348 K: Eq + std::hash::Hash + Clone + Send + Sync + 'static + std::fmt::Debug,
349 V: Clone + Send + Sync + std::fmt::Debug,
350{
351 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
352 f.debug_struct("ConcurrentMap")
353 .field("count", &self.map.len())
354 .finish()
355 }
356}
357
358impl<K, V> ConcurrentMap<K, V>
359where
360 K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
361 V: Clone + Send + Sync,
362{
363 /// Creates a new empty map.
364 pub fn new() -> Self {
365 Self {
366 map: DashMap::new(),
367 }
368 }
369
370 /// Creates a map with the specified capacity.
371 pub fn with_capacity(capacity: usize) -> Self {
372 Self {
373 map: DashMap::with_capacity(capacity),
374 }
375 }
376
377 /// Inserts a key-value pair.
378 pub fn insert(&self, key: K, value: V) -> Option<V> {
379 self.map.insert(key, value)
380 }
381
382 /// Gets a reference to a value.
383 pub fn get(&self, key: &K) -> Option<V> {
384 self.map.get(key).map(|ref_multi| ref_multi.clone())
385 }
386
387 /// Removes a key from the map.
388 pub fn remove(&self, key: &K) -> Option<V> {
389 self.map.remove(key).map(|(_, v)| v)
390 }
391
392 /// Returns the number of entries in the map.
393 pub fn len(&self) -> usize {
394 self.map.len()
395 }
396
397 /// Returns true if the map is empty.
398 pub fn is_empty(&self) -> bool {
399 self.map.is_empty()
400 }
401
402 /// Clears all entries.
403 pub fn clear(&self) {
404 self.map.clear();
405 }
406
407 /// Returns true if the map contains the key.
408 pub fn contains_key(&self, key: &K) -> bool {
409 self.map.contains_key(key)
410 }
411}
412
413impl<K, V> Clone for ConcurrentMap<K, V>
414where
415 K: Eq + std::hash::Hash + Clone + Send + Sync + 'static,
416 V: Clone + Send + Sync,
417{
418 fn clone(&self) -> Self {
419 Self {
420 map: self.map.clone(),
421 }
422 }
423}
424
425// ============================================================================
426// ConcurrentVec - Thread-safe vector
427// ============================================================================
428
429/// A thread-safe vector using RwLock.
430///
431/// Provides concurrent read access with exclusive write access.
432///
433/// ## Example
434///
435/// ```rust,ignore
436/// use spider_core::state::ConcurrentVec;
437///
438/// let vec = ConcurrentVec::new();
439/// vec.push(1);
440/// vec.push(2);
441/// assert_eq!(vec.len(), 2);
442/// ```
443#[derive(Debug, Default)]
444pub struct ConcurrentVec<T> {
445 vec: RwLock<Vec<T>>,
446}
447
448impl<T> ConcurrentVec<T>
449where
450 T: Clone + Send + Sync + 'static,
451{
452 /// Creates a new empty vector.
453 pub fn new() -> Self {
454 Self {
455 vec: RwLock::new(Vec::new()),
456 }
457 }
458
459 /// Creates a vector with the specified capacity.
460 pub fn with_capacity(capacity: usize) -> Self {
461 Self {
462 vec: RwLock::new(Vec::with_capacity(capacity)),
463 }
464 }
465
466 /// Pushes an element to the vector.
467 pub fn push(&self, value: T) {
468 self.vec.write().push(value);
469 }
470
471 /// Returns the number of elements.
472 pub fn len(&self) -> usize {
473 self.vec.read().len()
474 }
475
476 /// Returns true if the vector is empty.
477 pub fn is_empty(&self) -> bool {
478 self.vec.read().is_empty()
479 }
480
481 /// Clears all elements.
482 pub fn clear(&self) {
483 self.vec.write().clear();
484 }
485
486 /// Returns a copy of all elements.
487 pub fn to_vec(&self) -> Vec<T> {
488 self.vec.read().clone()
489 }
490}
491
492impl<T> Clone for ConcurrentVec<T>
493where
494 T: Clone + Send + Sync + 'static,
495{
496 fn clone(&self) -> Self {
497 Self {
498 vec: RwLock::new(self.vec.read().clone()),
499 }
500 }
501}
502
503// ============================================================================
504// StateAccessMetrics - Metrics for tracking state access patterns
505// ============================================================================
506
507/// Metrics for tracking state access patterns.
508///
509/// Useful for debugging and performance monitoring of state access.
510///
511/// ## Example
512///
513/// ```rust,ignore
514/// use spider_core::state::StateAccessMetrics;
515///
516/// let metrics = StateAccessMetrics::new();
517/// metrics.record_read();
518/// metrics.record_write();
519/// println!("Reads: {}, Writes: {}", metrics.read_count(), metrics.write_count());
520/// ```
521#[derive(Debug, Default)]
522pub struct StateAccessMetrics {
523 read_count: AtomicUsize,
524 write_count: AtomicUsize,
525 concurrent_access_peak: AtomicUsize,
526 current_concurrent: AtomicUsize,
527}
528
529impl StateAccessMetrics {
530 /// Creates a new metrics tracker.
531 pub fn new() -> Self {
532 Self::default()
533 }
534
535 /// Records a read access.
536 pub fn record_read(&self) {
537 self.read_count.fetch_add(1, Ordering::AcqRel);
538 }
539
540 /// Records a write access.
541 pub fn record_write(&self) {
542 self.write_count.fetch_add(1, Ordering::AcqRel);
543 }
544
545 /// Records the start of an access (read or write).
546 pub fn record_access_start(&self) {
547 let current = self.current_concurrent.fetch_add(1, Ordering::AcqRel);
548 let peak = self.concurrent_access_peak.load(Ordering::Acquire);
549 if current + 1 > peak {
550 self.concurrent_access_peak
551 .compare_exchange(peak, current + 1, Ordering::AcqRel, Ordering::Acquire)
552 .ok();
553 }
554 }
555
556 /// Records the end of an access.
557 pub fn record_access_end(&self) {
558 self.current_concurrent.fetch_sub(1, Ordering::AcqRel);
559 }
560
561 /// Returns the total number of read accesses.
562 pub fn read_count(&self) -> usize {
563 self.read_count.load(Ordering::Acquire)
564 }
565
566 /// Returns the total number of write accesses.
567 pub fn write_count(&self) -> usize {
568 self.write_count.load(Ordering::Acquire)
569 }
570
571 /// Returns the peak concurrent access count.
572 pub fn concurrent_access_peak(&self) -> usize {
573 self.concurrent_access_peak.load(Ordering::Acquire)
574 }
575
576 /// Returns the current concurrent access count.
577 pub fn current_concurrent(&self) -> usize {
578 self.current_concurrent.load(Ordering::Acquire)
579 }
580
581 /// Resets all counters.
582 pub fn reset(&self) {
583 self.read_count.store(0, Ordering::Release);
584 self.write_count.store(0, Ordering::Release);
585 self.concurrent_access_peak.store(0, Ordering::Release);
586 self.current_concurrent.store(0, Ordering::Release);
587 }
588}
589
590impl Clone for StateAccessMetrics {
591 fn clone(&self) -> Self {
592 Self {
593 read_count: AtomicUsize::new(self.read_count.load(Ordering::Acquire)),
594 write_count: AtomicUsize::new(self.write_count.load(Ordering::Acquire)),
595 concurrent_access_peak: AtomicUsize::new(
596 self.concurrent_access_peak.load(Ordering::Acquire),
597 ),
598 current_concurrent: AtomicUsize::new(self.current_concurrent.load(Ordering::Acquire)),
599 }
600 }
601}