1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462
//! Thread synchronization and pinning. //! //! # Registration //! //! In order to track all threads in one place, we need some form of thread registration. Every //! thread has a thread-local so-called "harness" that registers it the first time it is pinned, //! and unregisters when it exits. //! //! Registered threads are tracked in a global lock-free singly-linked list of thread entries. The //! head of this list is accessed by calling the `participants` function. //! //! # Thread entries //! //! Thread entries are implemented as the `Thread` data type. Every entry contains an integer that //! tells whether the thread is pinned and if so, what was the global epoch at the time it was //! pinned. Entries also hold a pin counter that aids in periodic global epoch advancement. use std::cell::Cell; use std::mem; use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT}; use std::sync::atomic::Ordering::{Relaxed, Release, SeqCst}; use epoch::Atomic; use epoch::garbage::{self, Bag, EPOCH}; thread_local! { /// The thread registration harness. /// /// The harness is lazily initialized on it's first use. Initialization performs registration. /// If initialized, the harness will get destructed on thread exit, which in turn unregisters /// the thread. static HARNESS: Harness = Harness { thread: Thread::register(), is_pinned: Cell::new(false), pin_count: Cell::new(0), bag: Cell::new(Box::into_raw(Box::new(Bag::new()))), }; } /// Holds thread-local data and unregisters the thread when dropped. struct Harness { /// This thread's entry in the participants list. thread: *const Thread, /// Whether the thread is currently pinned. is_pinned: Cell<bool>, /// Total number of pinnings performed. pin_count: Cell<usize>, /// The local bag of objects that will be later freed. bag: Cell<*mut Bag>, } impl Drop for Harness { fn drop(&mut self) { // Now that the thread is exiting, we must move the local bag into the global garbage // queue. Also, let's try advancing the epoch and help free some garbage. let thread = unsafe { &*self.thread }; // If we called `pin()` here, it would try to access `HARNESS` and then panic. // To work around the problem, we manually pin the thread. let pin = &Pin { bag: &self.bag }; thread.set_pinned(pin); // Spare some cycles on garbage collection. // Note: This may itself produce garbage and in turn allocate new bags. try_advance(pin); garbage::collect(pin); // Push the local bag into the global garbage queue. let bag = unsafe { Box::from_raw(self.bag.get()) }; garbage::push(bag, pin); // Manually unpin the thread. thread.set_unpinned(); // Mark the thread entry as deleted. thread.unregister(); } } /// An entry in the linked list of participating threads. struct Thread { /// The least significant bit is set if the thread is currently pinned. The rest of the bits /// encode the current epoch. state: AtomicUsize, /// The next thread in the linked list of participants. If the tag is 1, this entry is deleted /// and can be unlinked from the list. next: Atomic<Thread>, } impl Thread { /// Marks the thread as pinned. /// /// Must not be called if the thread is already pinned! #[inline] fn set_pinned(&self, _pin: &Pin) { let epoch = EPOCH.load(Relaxed); let state = epoch | 1; // Now we must store `state` into `self.state`. It's important that any succeeding loads // don't get reordered with this store. In order words, this thread's epoch must be fully // announced to other threads. Only then it becomes safe to load from the shared memory. if cfg!(any(target_arch = "x86", target_arch = "x86_64")) { // On x86 architectures we have a choice: // 1. `atomic::fence(SeqCst)`, which compiles to a `mfence` instruction. // 2. `compare_and_swap(_, _, SeqCst)`, which compiles to a `lock cmpxchg` instruction. // // Both instructions have the effect of a full barrier, but the second one seems to be // faster in this particular case. let previous = self.state.load(Relaxed); self.state.compare_and_swap(previous, state, SeqCst); } else { self.state.store(state, Relaxed); ::std::sync::atomic::fence(SeqCst); } } /// Marks the thread as unpinned. #[inline] fn set_unpinned(&self) { // Clear the last bit. // We don't need to preserve the epoch, so just store the number zero. self.state.store(0, Release); } /// Registers a thread by adding a new entry to the list of participanting threads. /// /// Returns a pointer to the newly allocated entry. fn register() -> *mut Thread { let list = participants(); let mut new = Box::new(Thread { state: AtomicUsize::new(0), next: Atomic::null(0), }); // This code is executing while the thread harness is initializing, so normal pinning would // try to access it while it is being initialized. Such accesses fail with a panic. We must // therefore cheat by creating a fake pin. let pin = unsafe { &mem::zeroed::<Pin>() }; let mut head = list.load(pin); loop { new.next.store(head); // Try installing this thread's entry as the new head. match list.cas_box(head, new, 0) { Ok(n) => return n.as_raw(), Err((h, n)) => { head = h; new = n; } } } } /// Unregisters the thread by marking it's entry as deleted. /// /// This function doesn't physically remove the entry from the linked list, though. That will /// do a future call to `try_advance`. fn unregister(&self) { // This code is executing while the thread harness is initializing, so normal pinning would // try to access it while it is being initialized. Such accesses fail with a panic. We must // therefore cheat by creating a fake pin. let pin = unsafe { &mem::zeroed::<Pin>() }; // Simply mark the next-pointer in this thread's entry. let mut next = self.next.load(pin); while next.tag() == 0 { match self.next.cas(next, next.with_tag(1)) { Ok(()) => break, Err(n) => next = n, } } } } /// Returns a reference to the head pointer of the list of participating threads. fn participants() -> &'static Atomic<Thread> { static PARTICIPANTS: AtomicUsize = ATOMIC_USIZE_INIT; unsafe { &*(&PARTICIPANTS as *const _ as *const _) } } /// Attempts to advance the global epoch. /// /// The global epoch can advance only if all currently pinned threads have been pinned in the /// current epoch. #[cold] pub fn try_advance(pin: &Pin) { let epoch = EPOCH.load(SeqCst); // Traverse the linked list of participating threads. let mut pred = participants(); let mut curr = pred.load(pin); while let Some(c) = curr.as_ref() { let succ = c.next.load(pin); if succ.tag() == 1 { // This thread has exited. Try unlinking it from the list. let succ = succ.with_tag(0); if pred.cas(curr, succ).is_err() { // We lost the race to unlink the thread. Usually that means we should traverse the // list again from the beginning, but since another thread trying to advance the // epoch has won the race, we leave the job to that one. return; } // The unlinked entry can later be freed. unsafe { defer_free(c as *const _ as *mut Thread, 1, pin) } // Move forward, but don't change the predecessor. curr = succ; } else { let thread_state = c.state.load(SeqCst); let thread_is_pinned = thread_state & 1 == 1; let thread_epoch = thread_state & !1; // If the thread was pinned in a different epoch, we cannot advance the global epoch // just yet. if thread_is_pinned && thread_epoch != epoch { return; } // Move one step forward. pred = &c.next; curr = succ; } } // All pinned threads were pinned in the current global epoch. // Finally, try advancing the epoch. We increment by 2 and simply wrap around on overflow. EPOCH.compare_and_swap(epoch, epoch.wrapping_add(2), SeqCst); } /// A witness that the current thread is pinned. /// /// A reference to `Pin` is proof that the current thread is pinned. Lots of methods that interact /// with [`Atomic`]s can safely be called only while the thread is pinned so they often require a /// reference to `Pin`. /// /// This data type is inherently bound to the thread that created it, therefore it does not /// implement `Send` nor `Sync`. /// /// [`Atomic`]: struct.Atomic.html #[derive(Debug)] pub struct Pin { /// A pointer to the cell within the harness, which holds a pointer to the local bag. /// /// This pointer is kept within `Pin` as a matter of convenience. It could also be reached /// through the harness itself, but that doesn't work if we're in the process of it's /// destruction. bag: *const Cell<*mut Bag>, // !Send + !Sync } /// Pins the current thread. /// /// The provided function takes a reference to a `Pin`, which can be used to interact with /// [`Atomic`]s. The pin serves as a proof that whatever data you load from an [`Atomic`] will not /// be concurrently deleted by another thread while the pin is alive. /// /// Note that keeping a thread pinned for a long time prevents memory reclamation of any newly /// deleted objects protected by [`Atomic`]s. The provided function should be very quick - /// generally speaking, it shouldn't take more than 100 ms. /// /// Pinning is reentrant. There is no harm in pinning a thread while it's already pinned (repinning /// is essentially a noop). /// /// Pinning itself comes with a price: it begins with a `SeqCst` fence and performs a few other /// atomic operations. However, this mechanism is designed to be as performant as possible, so it /// can be used pretty liberally. On a modern machine pinning takes 10 to 15 nanoseconds. /// /// [`Atomic`]: struct.Atomic.html pub fn pin<F, T>(f: F) -> T where F: FnOnce(&Pin) -> T { /// Number of pinnings after which a thread will collect some global garbage. const PINS_BETWEEN_COLLECT: usize = 128; HARNESS.with(|harness| { let thread = unsafe { &*harness.thread }; let pin = &Pin { bag: &harness.bag }; let was_pinned = harness.is_pinned.get(); if !was_pinned { // Pin the thread. harness.is_pinned.set(true); thread.set_pinned(pin); // Increment the pin counter. let count = harness.pin_count.get(); harness.pin_count.set(count.wrapping_add(1)); // If the counter progressed enough, try advancing the epoch and collecting garbage. if count % PINS_BETWEEN_COLLECT == 0 { try_advance(pin); garbage::collect(pin); } } // This will unpin the thread even if `f` panics. defer! { if !was_pinned { // Unpin the thread. thread.set_unpinned(); harness.is_pinned.set(false); } } f(pin) }) } /// Returns `true` if the current thread is pinned. #[inline] pub fn is_pinned() -> bool { HARNESS.with(|harness| harness.is_pinned.get()) } /// Stashes away an object that will later be freed. /// /// The specified object is an array allocated at address `object` and consists of `count` elements /// of type `T`. /// /// This function inserts the object into a thread-local buffer. When the buffers becomes full, /// it's objects are flushed into the globally shared [`Garbage`] instance. /// /// If the object is unusually large, it is wise to follow up with a call to [`flush`] so that it /// doesn't get stuck waiting in the buffer for a long time. /// /// [`Garbage`]: struct.Garbage.html /// [`flush`]: fn.flush.html pub unsafe fn defer_free<T>(object: *mut T, count: usize, pin: &Pin) { unsafe fn free<T>(ptr: *mut T, count: usize) { // Free the memory, but don't run the destructors. drop(Vec::from_raw_parts(ptr, 0, count)); } loop { // Get the thread-local bag. let cell = &*pin.bag; let bag = cell.get(); // Try inserting the object into the bag. if (*bag).try_insert(free::<T>, object, count) { // Success! We're done. break; } // Flush the garbage and create a new bag. flush(pin); } } /// Flushes the buffered thread-local garbage. /// /// It is wise to flush the garbage just after passing a very large object to [`defer_free`], so /// that it isn't sitting in the buffer for a long time. /// /// [`defer_free`]: fn.defer_free.html pub fn flush(pin: &Pin) { unsafe { // Get the thread-local bag. let cell = &*pin.bag; let bag = cell.get(); if !(*bag).is_empty() { // The bag is full. We must replace it with a fresh one. cell.set(Box::into_raw(Box::new(Bag::new()))); // Push the old bag into the garbage queue. let bag = Box::from_raw(bag); garbage::push(bag, pin); // Spare some cycles on garbage collection. // Note: This may itself produce garbage and allocate new bags. try_advance(pin); garbage::collect(pin); } } } #[cfg(test)] mod tests { use std::thread; use std::sync::atomic::Ordering::SeqCst; use epoch; use epoch::garbage::EPOCH; use epoch::thread::{HARNESS, try_advance}; #[test] fn pin_reentrant() { assert!(!epoch::is_pinned()); epoch::pin(|_| { assert!(epoch::is_pinned()); epoch::pin(|_| { assert!(epoch::is_pinned()); }); assert!(epoch::is_pinned()); }); assert!(!epoch::is_pinned()); } #[test] fn flush_local_garbage() { for _ in 0..100 { epoch::pin(|pin| { unsafe { let a = Box::into_raw(Box::new(7)); epoch::defer_free(a, 1, pin); HARNESS.with(|h| { assert!(!(*h.bag.get()).is_empty()); while !(*h.bag.get()).is_empty() { epoch::flush(pin); } }); } }); } } #[test] fn garbage_buffering() { HARNESS.with(|h| unsafe { while !(*h.bag.get()).is_empty() { epoch::pin(|pin| epoch::flush(pin)); } epoch::pin(|pin| { for _ in 0..10 { let a = Box::into_raw(Box::new(7)); epoch::defer_free(a, 1, pin); } assert!(!(*h.bag.get()).is_empty()); }); }); } #[test] fn pin_holds_advance() { let threads = (0..8).map(|_| { thread::spawn(|| { for _ in 0..500_000 { epoch::pin(|pin| { let before = EPOCH.load(SeqCst); try_advance(pin); let after = EPOCH.load(SeqCst); assert!(after.wrapping_sub(before) <= 2); }); } }) }).collect::<Vec<_>>(); for t in threads { t.join().unwrap(); } } }