palladium_runtime/
placement.rs1use std::sync::atomic::{AtomicU64, Ordering};
2
3use dashmap::DashMap;
4use palladium_actor::AddrHash;
5
6#[derive(Debug, Clone, PartialEq)]
10pub enum Placement {
11 Auto,
13 Core(usize),
17 ColocateWith(AddrHash),
22}
23
24pub type PlacementMap = DashMap<u64, usize>;
32
33pub fn default_core_for(hash: AddrHash, num_cores: usize) -> usize {
38 debug_assert!(num_cores > 0, "num_cores must be > 0");
39 (hash.path_hash() as usize) % num_cores
40}
41
42#[repr(align(64))]
49pub struct CoreStats {
50 pub messages_processed: AtomicU64,
51 pub actors_spawned: AtomicU64,
52 pub mailbox_drops: AtomicU64,
53}
54
55impl CoreStats {
56 pub fn new() -> Self {
57 Self {
58 messages_processed: AtomicU64::new(0),
59 actors_spawned: AtomicU64::new(0),
60 mailbox_drops: AtomicU64::new(0),
61 }
62 }
63
64 pub fn messages_processed(&self) -> u64 {
65 self.messages_processed.load(Ordering::Relaxed)
66 }
67
68 pub fn actors_spawned(&self) -> u64 {
69 self.actors_spawned.load(Ordering::Relaxed)
70 }
71
72 pub fn mailbox_drops(&self) -> u64 {
73 self.mailbox_drops.load(Ordering::Relaxed)
74 }
75}
76
77impl Default for CoreStats {
78 fn default() -> Self {
79 Self::new()
80 }
81}
82
83#[cfg(test)]
84mod tests {
85 use super::*;
86
87 #[test]
88 fn core_stats_cache_line_aligned() {
89 assert_eq!(std::mem::align_of::<CoreStats>(), 64);
90 }
91
92 #[test]
93 fn core_stats_counters_start_at_zero() {
94 let s = CoreStats::new();
95 assert_eq!(s.messages_processed(), 0);
96 assert_eq!(s.actors_spawned(), 0);
97 assert_eq!(s.mailbox_drops(), 0);
98 }
99
100 #[test]
101 fn core_stats_increments() {
102 let s = CoreStats::new();
103 s.messages_processed.fetch_add(5, Ordering::Relaxed);
104 s.actors_spawned.fetch_add(2, Ordering::Relaxed);
105 s.mailbox_drops.fetch_add(1, Ordering::Relaxed);
106 assert_eq!(s.messages_processed(), 5);
107 assert_eq!(s.actors_spawned(), 2);
108 assert_eq!(s.mailbox_drops(), 1);
109 }
110
111 #[test]
112 fn default_core_for_distributes_evenly() {
113 const N: usize = 10_000;
117 const NUM_CORES: usize = 4;
118 let expected = N / NUM_CORES;
119 let tolerance = expected * 30 / 100; let mut counts = [0usize; NUM_CORES];
122 for i in 0..N {
123 let bytes = i.to_le_bytes();
124 let hash = AddrHash::synthetic(&bytes);
125 let core = default_core_for(hash, NUM_CORES);
126 assert!(core < NUM_CORES, "core {core} out of range");
127 counts[core] += 1;
128 }
129
130 for (c, &count) in counts.iter().enumerate() {
131 assert!(
132 (expected as isize - count as isize).unsigned_abs() <= tolerance,
133 "core {c} count {count} is outside expected range [{}, {}]",
134 expected - tolerance,
135 expected + tolerance,
136 );
137 }
138 }
139
140 #[test]
141 fn default_core_for_single_core_always_zero() {
142 for i in 0..100u64 {
143 let bytes = i.to_le_bytes();
144 let hash = AddrHash::synthetic(&bytes);
145 assert_eq!(default_core_for(hash, 1), 0);
146 }
147 }
148
149 #[test]
150 fn placement_map_insert_lookup_remove() {
151 let map: PlacementMap = DashMap::new();
152 let addr = AddrHash::synthetic(b"test-actor");
153
154 map.insert(addr.path_hash(), 2);
155 assert_eq!(map.get(&addr.path_hash()).map(|v| *v), Some(2));
156
157 map.remove(&addr.path_hash());
158 assert!(map.get(&addr.path_hash()).is_none());
159 }
160
161 #[test]
162 fn placement_map_concurrent_reads() {
163 use std::sync::Arc;
164
165 let map = Arc::new(PlacementMap::new());
166
167 let hashes: Vec<AddrHash> = (0..100u64)
169 .map(|i| AddrHash::synthetic(&i.to_le_bytes()))
170 .collect();
171 for (i, &hash) in hashes.iter().enumerate() {
172 map.insert(hash.path_hash(), i % 4);
173 }
174
175 let hashes = Arc::new(hashes);
177 let threads: Vec<_> = (0..8)
178 .map(|_| {
179 let m = Arc::clone(&map);
180 let h = Arc::clone(&hashes);
181 std::thread::spawn(move || {
182 for i in 0..1000usize {
183 let hash = h[i % 100];
184 let _ = m.get(&hash.path_hash()).map(|v| *v);
185 }
186 })
187 })
188 .collect();
189
190 for t in threads {
191 t.join().unwrap();
192 }
193 }
194}