1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
//! Concurrent access safety for agent operations
//!
//! Provides per-node locking to ensure safe concurrent access by multiple agents.
//! Read operations don't require locks (immutable data), but write operations
//! use per-node locks to prevent corruption.
use crate::types::NodeID;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::Arc;
/// Per-node lock manager for concurrent access safety
///
/// Provides fine-grained locking at the node level, allowing multiple agents
/// to operate on different nodes concurrently while preventing conflicts on
/// the same node.
pub struct NodeLockManager {
/// Map from NodeID to per-node read-write lock
/// Uses Arc<RwLock<()>> to allow shared ownership and fine-grained locking
locks: Arc<RwLock<HashMap<NodeID, Arc<RwLock<()>>>>>,
}
impl NodeLockManager {
/// Create a new node lock manager
pub fn new() -> Self {
Self {
locks: Arc::new(RwLock::new(HashMap::new())),
}
}
/// Get or create a lock for a specific node
///
/// Returns a guard that can be used for read or write locking.
/// The lock is automatically cleaned up when no longer needed.
fn get_node_lock(&self, node_id: &NodeID) -> Arc<RwLock<()>> {
// Try to get existing lock (read lock for map lookup)
{
let map = self.locks.read();
if let Some(lock) = map.get(node_id) {
return lock.clone();
}
}
// Lock doesn't exist, create it (write lock for map modification)
let mut map = self.locks.write();
// Double-check after acquiring write lock (another thread might have created it)
map.entry(*node_id)
.or_insert_with(|| Arc::new(RwLock::new(())))
.clone()
}
/// Get the lock for a node
///
/// Returns an Arc to the node's lock, which can be used to acquire
/// read or write guards. The lock is automatically cleaned up when
/// no longer needed.
pub fn get_lock(&self, node_id: &NodeID) -> Arc<RwLock<()>> {
self.get_node_lock(node_id)
}
}
impl Default for NodeLockManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
#[test]
fn test_concurrent_reads() {
let manager = Arc::new(NodeLockManager::new());
let node_id: NodeID = [1u8; 32];
let counter = Arc::new(AtomicUsize::new(0));
// Spawn multiple threads that all read-lock the same node
let mut handles = vec![];
for _ in 0..10 {
let manager = manager.clone();
let counter = counter.clone();
let handle = thread::spawn(move || {
let lock = manager.get_lock(&node_id);
let _guard = lock.read();
counter.fetch_add(1, Ordering::SeqCst);
});
handles.push(handle);
}
// Wait for all threads
for handle in handles {
handle.join().unwrap();
}
// All reads should have completed
assert_eq!(counter.load(Ordering::SeqCst), 10);
}
#[test]
fn test_write_excludes_other_writes() {
let manager = Arc::new(NodeLockManager::new());
let node_id: NodeID = [1u8; 32];
let counter = Arc::new(AtomicUsize::new(0));
// Spawn multiple threads that all write-lock the same node
let mut handles = vec![];
for _ in 0..5 {
let manager = manager.clone();
let counter = counter.clone();
let handle = thread::spawn(move || {
let lock = manager.get_lock(&node_id);
let _guard = lock.write();
let current = counter.load(Ordering::SeqCst);
thread::yield_now(); // Give other threads a chance
counter.store(current + 1, Ordering::SeqCst);
});
handles.push(handle);
}
// Wait for all threads
for handle in handles {
handle.join().unwrap();
}
// All writes should have completed sequentially (no lost updates)
assert_eq!(counter.load(Ordering::SeqCst), 5);
}
#[test]
fn test_different_nodes_dont_block() {
let manager = Arc::new(NodeLockManager::new());
let node_id1: NodeID = [1u8; 32];
let node_id2: NodeID = [2u8; 32];
let counter = Arc::new(AtomicUsize::new(0));
// Spawn threads that lock different nodes
let mut handles = vec![];
for i in 0..5 {
let manager = manager.clone();
let counter = counter.clone();
let node_id = if i % 2 == 0 { node_id1 } else { node_id2 };
let handle = thread::spawn(move || {
let lock = manager.get_lock(&node_id);
let _guard = lock.write();
counter.fetch_add(1, Ordering::SeqCst);
});
handles.push(handle);
}
// Wait for all threads
for handle in handles {
handle.join().unwrap();
}
// All operations should complete (different nodes don't block each other)
assert_eq!(counter.load(Ordering::SeqCst), 5);
}
}