Skip to main content

palladium_runtime/
placement.rs

1use std::sync::atomic::{AtomicU64, Ordering};
2
3use dashmap::DashMap;
4use palladium_actor::AddrHash;
5
6// ── Placement ─────────────────────────────────────────────────────────────────
7
8/// Specifies where a newly spawned actor should be placed.
9#[derive(Debug, Clone, PartialEq)]
10pub enum Placement {
11    /// Engine chooses the core using `hash(ActorPath) % num_cores`.
12    Auto,
13    /// Pin the actor to a specific core index.
14    ///
15    /// Panics at spawn time if `core_id >= num_cores`.
16    Core(usize),
17    /// Place the actor on the same core as an existing actor.
18    ///
19    /// Returns `SpawnError::ColocateTargetNotFound` if the target
20    /// `AddrHash` is not registered in the placement map.
21    ColocateWith(AddrHash),
22}
23
24// ── PlacementMap ──────────────────────────────────────────────────────────────
25
26/// Shared, concurrent map from path hash (u64) to core index.
27///
28/// Written only at actor spawn/stop time (rare).
29/// Read on every cross-core send (frequent).
30/// [`DashMap`] provides shard-based concurrent access with no global lock.
31pub type PlacementMap = DashMap<u64, usize>;
32
33// ── default_core_for ──────────────────────────────────────────────────────────
34
35/// Hash-based default placement: distributes actors across cores by taking the
36/// path hash bits of the `AddrHash` modulo `num_cores`.
37pub fn default_core_for(hash: AddrHash, num_cores: usize) -> usize {
38    debug_assert!(num_cores > 0, "num_cores must be > 0");
39    (hash.path_hash() as usize) % num_cores
40}
41
42// ── CoreStats ─────────────────────────────────────────────────────────────────
43
44/// Per-core statistics counters.
45///
46/// `#[repr(align(64))]` ensures each instance occupies its own cache line,
47/// preventing false sharing when cores update their own stats concurrently.
48#[repr(align(64))]
49pub struct CoreStats {
50    pub messages_processed: AtomicU64,
51    pub actors_spawned: AtomicU64,
52    pub mailbox_drops: AtomicU64,
53}
54
55impl CoreStats {
56    pub fn new() -> Self {
57        Self {
58            messages_processed: AtomicU64::new(0),
59            actors_spawned: AtomicU64::new(0),
60            mailbox_drops: AtomicU64::new(0),
61        }
62    }
63
64    pub fn messages_processed(&self) -> u64 {
65        self.messages_processed.load(Ordering::Relaxed)
66    }
67
68    pub fn actors_spawned(&self) -> u64 {
69        self.actors_spawned.load(Ordering::Relaxed)
70    }
71
72    pub fn mailbox_drops(&self) -> u64 {
73        self.mailbox_drops.load(Ordering::Relaxed)
74    }
75}
76
77impl Default for CoreStats {
78    fn default() -> Self {
79        Self::new()
80    }
81}
82
83#[cfg(test)]
84mod tests {
85    use super::*;
86
87    #[test]
88    fn core_stats_cache_line_aligned() {
89        assert_eq!(std::mem::align_of::<CoreStats>(), 64);
90    }
91
92    #[test]
93    fn core_stats_counters_start_at_zero() {
94        let s = CoreStats::new();
95        assert_eq!(s.messages_processed(), 0);
96        assert_eq!(s.actors_spawned(), 0);
97        assert_eq!(s.mailbox_drops(), 0);
98    }
99
100    #[test]
101    fn core_stats_increments() {
102        let s = CoreStats::new();
103        s.messages_processed.fetch_add(5, Ordering::Relaxed);
104        s.actors_spawned.fetch_add(2, Ordering::Relaxed);
105        s.mailbox_drops.fetch_add(1, Ordering::Relaxed);
106        assert_eq!(s.messages_processed(), 5);
107        assert_eq!(s.actors_spawned(), 2);
108        assert_eq!(s.mailbox_drops(), 1);
109    }
110
111    #[test]
112    fn default_core_for_distributes_evenly() {
113        // Generate 10_000 synthetic AddrHashes and verify none is placed
114        // out of range, and the distribution across 4 cores is roughly even
115        // (within ±30% of expected 2500 per core).
116        const N: usize = 10_000;
117        const NUM_CORES: usize = 4;
118        let expected = N / NUM_CORES;
119        let tolerance = expected * 30 / 100; // ±30%
120
121        let mut counts = [0usize; NUM_CORES];
122        for i in 0..N {
123            let bytes = i.to_le_bytes();
124            let hash = AddrHash::synthetic(&bytes);
125            let core = default_core_for(hash, NUM_CORES);
126            assert!(core < NUM_CORES, "core {core} out of range");
127            counts[core] += 1;
128        }
129
130        for (c, &count) in counts.iter().enumerate() {
131            assert!(
132                (expected as isize - count as isize).unsigned_abs() <= tolerance,
133                "core {c} count {count} is outside expected range [{}, {}]",
134                expected - tolerance,
135                expected + tolerance,
136            );
137        }
138    }
139
140    #[test]
141    fn default_core_for_single_core_always_zero() {
142        for i in 0..100u64 {
143            let bytes = i.to_le_bytes();
144            let hash = AddrHash::synthetic(&bytes);
145            assert_eq!(default_core_for(hash, 1), 0);
146        }
147    }
148
149    #[test]
150    fn placement_map_insert_lookup_remove() {
151        let map: PlacementMap = DashMap::new();
152        let addr = AddrHash::synthetic(b"test-actor");
153
154        map.insert(addr.path_hash(), 2);
155        assert_eq!(map.get(&addr.path_hash()).map(|v| *v), Some(2));
156
157        map.remove(&addr.path_hash());
158        assert!(map.get(&addr.path_hash()).is_none());
159    }
160
161    #[test]
162    fn placement_map_concurrent_reads() {
163        use std::sync::Arc;
164
165        let map = Arc::new(PlacementMap::new());
166
167        // Pre-populate 100 actors on core 0.
168        let hashes: Vec<AddrHash> = (0..100u64)
169            .map(|i| AddrHash::synthetic(&i.to_le_bytes()))
170            .collect();
171        for (i, &hash) in hashes.iter().enumerate() {
172            map.insert(hash.path_hash(), i % 4);
173        }
174
175        // 8 reader threads each do 1000 lookups — no panics or data races.
176        let hashes = Arc::new(hashes);
177        let threads: Vec<_> = (0..8)
178            .map(|_| {
179                let m = Arc::clone(&map);
180                let h = Arc::clone(&hashes);
181                std::thread::spawn(move || {
182                    for i in 0..1000usize {
183                        let hash = h[i % 100];
184                        let _ = m.get(&hash.path_hash()).map(|v| *v);
185                    }
186                })
187            })
188            .collect();
189
190        for t in threads {
191            t.join().unwrap();
192        }
193    }
194}