1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926
use crate::cell::cell::{new_actor_cell_owner, new_share_cell_owner}; use crate::cell::cell::{ActorCellMaker, ActorCellOwner, ShareCellOwner}; use crate::queue::FnOnceQueue; use crate::timers::Timers; use crate::waker::WakeHandlers; use crate::{ Deferrer, FixedTimerKey, LogFilter, LogID, LogLevel, LogRecord, LogVisitor, MaxTimerKey, MinTimerKey, Share, StopCause, Waker, }; use std::collections::VecDeque; use std::fmt::Arguments; use std::mem; use std::ops::{Deref, DerefMut}; use std::time::{Duration, Instant, SystemTime}; /// The external interface to the actor runtime /// /// This contains all the queues and timers, and controls access to /// the state of all the actors. It also provides the interface to /// control all this from outside the runtime, i.e. it provides the /// calls used by an event loop. The [`Stakker`] instance itself is /// not accessible from actors due to borrowing restrictions. It /// derefs to a [`Core`] reference through auto-deref or `*stakker`. /// /// [`Core`]: struct.Core.html /// [`Stakker`]: struct.Stakker.html pub struct Stakker { pub(crate) core: Core, pub(crate) actor_owner: ActorCellOwner, alt_queues: Option<(FnOnceQueue<Stakker>, FnOnceQueue<Stakker>)>, recreate_queues_time: Instant, } impl Stakker { /// Construct a [`Stakker`] instance. Whether more than one /// instance can be created in each process or thread depends on /// the [Cargo features](index.html#cargo-features) enabled. /// /// [`Stakker`]: struct.Stakker.html pub fn new(now: Instant) -> Self { FnOnceQueue::<Stakker>::sanity_check(); // Do this first to get the uniqueness checks to fail early, let (actor_owner, actor_maker) = new_actor_cell_owner(); Self { core: Core::new(now, actor_maker), actor_owner, alt_queues: Some((FnOnceQueue::new(), FnOnceQueue::new())), recreate_queues_time: now + Duration::from_secs(60), } } /// Return the next timer expiry time, or None pub fn next_expiry(&mut self) -> Option<Instant> { self.core.timers.next_expiry() } /// Return how long we need to wait for the next timer, or None if /// there are no timers to wait for pub fn next_wait(&mut self, now: Instant) -> Option<Duration> { self.core .timers .next_expiry() .map(|t| t.saturating_duration_since(now)) } /// Return how long to wait for the next I/O poll. If there are /// idle items queued (`idle_pending` is true), return 0 seconds, /// which allows the caller to quickly check for more I/O and then /// run the idle queue if there is nothing to do. If there is a /// timer active, return the time to wait for that timer, limited /// by `maxdur`. If there is nothing to wait for, just return /// `maxdur`. pub fn next_wait_max( &mut self, now: Instant, maxdur: Duration, idle_pending: bool, ) -> Duration { if idle_pending { Duration::from_secs(0) } else { self.core .timers .next_expiry() .map(|t| t.saturating_duration_since(now).min(maxdur)) .unwrap_or(maxdur) } } /// Move time forward, expire any timers onto the main /// [`Deferrer`] queue, then run main and lazy queues until there /// is nothing outstanding. Returns `true` if there are idle /// items still to run. /// /// If `idle` is true, then runs an item from the idle queue as /// well. This should be set only if we've already run the queues /// and just polled I/O (without waiting) and still there's /// nothing to do. /// /// Note: All actors should use `cx.now()` to get the time, which /// allows the entire system to be run in virtual time (unrelated /// to real time) if necessary. /// /// [`Deferrer`]: struct.Deferrer.html pub fn run(&mut self, now: Instant, idle: bool) -> bool { if idle { if let Some(cb) = self.idle_queue.pop_front() { cb(self); } } // Swap between normal and alt queues in an attempt to keep // using the same memory, which should be more cache- // efficient. However if there has been a big burst of // activity, the queues might have grown huge. So // periodically force the queues to be recreated. // // It's necessary to swap out queues before executing them, // because whilst executing, more items may be added to any of // them. (Also the borrow checker complains.) // Run main queue and timers let (mut alt_main, mut alt_lazy) = (self.alt_queues.take()).expect("Previous run call must have panicked"); self.core.deferrer.swap_queue(&mut alt_main); if now > self.core.now { self.core.now = now; self.core.timers.advance(now, &mut alt_main); } alt_main.execute(self); // Keep running main and lazy queues until exhaustion loop { self.core.deferrer.swap_queue(&mut alt_main); if !alt_main.is_empty() { alt_main.execute(self); continue; } mem::swap(&mut self.core.lazy_queue, &mut alt_lazy); if !alt_lazy.is_empty() { alt_lazy.execute(self); continue; } break; } self.alt_queues = Some((alt_main, alt_lazy)); // Recreate the queues? They will all be empty at this point. if now > self.recreate_queues_time { self.alt_queues = Some((FnOnceQueue::new(), FnOnceQueue::new())); self.core.lazy_queue = FnOnceQueue::new(); self.core.deferrer.set_queue(FnOnceQueue::new()); self.recreate_queues_time = now + Duration::from_secs(60); } !self.core.idle_queue.is_empty() } /// Set the current `SystemTime`, for use in a virtual time main /// loop. If `None` is passed, then `core.systime()` just calls /// `SystemTime::now()`. Otherwise `core.systime()` returns the /// provided `SystemTime` instead. #[inline] pub fn set_systime(&mut self, systime: Option<SystemTime>) { self.core.systime = systime; } /// Set the logger and logging level /// /// The provided logger will be called synchronously every time a /// [`Core::log`] call is made if the logging level is enabled. /// It is provided with a [`Core`] reference, so can access a /// [`Share`], or defer calls to actors as necessary. It may /// alternatively choose to forward the logging to an external log /// framework, such as the `log` or `tracing` crates. /// /// The enabled logging levels are described by `filter`. /// Typically you'd set something like /// `LogFilter::all(&[LogLevel::Info, LogLevel::Audit, /// LogLevel::Open])` or `LogFilter::from_str("info,audit,open")`, /// which enable info and above, plus audit and span open/close. /// See [`LogFilter`]. /// /// Note that the **logger** feature must be enabled for this call /// to succeed. The **Stakker** crate provides only the core /// logging functionality. It adds a 64-bit logging ID to each /// actor and logs actor startup and termination. It provides the /// framework for logging formatted-text and key-value pairs along /// with an actor's logging-ID for context. An external crate /// like `stakker_log` may be used to provide macros that allow /// convenient logging from actor code and to allow interfacing to /// external logging systems. /// /// [`Core::log`]: struct.Core.html#method.log /// [`Core`]: struct.Core.html /// [`LogFilter`]: struct.LogFilter.html /// [`Share`]: struct.Share.html #[inline] #[allow(unused_variables)] pub fn set_logger( &mut self, filter: LogFilter, logger: impl FnMut(&mut Core, &LogRecord<'_>) + 'static, ) { #[cfg(feature = "logger")] { self.log_filter = filter; self.logger = Some(Box::new(logger)); } #[cfg(not(feature = "logger"))] panic!("Enable 'logger' feature before setting a logger"); } /// Used to provide **Stakker** with a means to wake the main /// thread from another thread. This enables [`Waker`] and /// associated functionality. A poll-waker is not required /// otherwise. /// /// Normally the main thread will be blocked waiting for I/O /// events most of the time, with a timeout to handle the /// next-expiring timer. If **Stakker** code running in another /// thread wants to defer a call to the main thread, then it needs /// a way to interrupt that blocked call. This is done via /// creating an artificial I/O event. (For example, `mio` handles /// this with a `mio::Waker` instance which wraps various /// platform-specific ways of creating an artificial I/O event.) /// /// So **Stakker** calls the `waker` provided to this call, which /// causes the I/O polling implementation to trigger an artificial /// I/O event, which results in the I/O polling implementation /// calling `Stakker::poll_wake()`. /// /// Normally the poll-waker will be set up automatically by the /// user's chosen I/O polling implementation (for example /// `stakker_mio`) on initialisation. /// /// [`Waker`]: struct.Waker.html pub fn set_poll_waker(&mut self, waker: impl Fn() + Send + Sync + 'static) { if !self.wake_handlers_unset { panic!("Stakker::set_poll_waker called more than once"); } self.wake_handlers = WakeHandlers::new(Box::new(waker)); self.wake_handlers_unset = false; } /// Indicate to **Stakker** that the main thread has been woken up /// due to a call from another thread to the waker configured with /// `set_poll_waker`. The I/O polling implementation /// (e.g. `stakker_mio`) makes this call to let **Stakker** know /// that it should do its [`Waker`] handling. /// /// [`Waker`]: struct.Waker.html pub fn poll_wake(&mut self) { // Due to wake handlers needing a Stakker ref, we can't have // any borrow on the `wake_handlers` active. So we // have to pull the handler out to call it and then put it // back in again afterwards. This is only a `usize` being // changed in memory, though. for bit in self.wake_handlers.wake_list() { if let Some(mut cb) = self.wake_handlers.handler_borrow(bit) { cb(self, false); self.wake_handlers.handler_restore(bit, cb); } } } // Process all the outstanding wake handler drops #[cfg(feature = "inter-thread")] pub(crate) fn process_waker_drops(&mut self) { for bit in self.wake_handlers.drop_list() { if let Some(mut cb) = self.wake_handlers.del(bit) { cb(self, true); } } } // For testing #[cfg(all(test, feature = "inter-thread"))] pub(crate) fn poll_waker_handler_count(&self) -> usize { self.wake_handlers.handler_count() } } impl Deref for Stakker { type Target = Core; fn deref(&self) -> &Core { &self.core } } impl DerefMut for Stakker { fn deref_mut(&mut self) -> &mut Core { &mut self.core } } impl Drop for Stakker { fn drop(&mut self) { // The defer queue may contain items that have circular // references back to the defer queue via a Deferrer, which // means they never get freed. In addition when an actor // referenced from the queue is dropped, it may queue a // handler, which in turn holds a circular reference. So // special handling is required to avoid leaking memory on // Stakker termination. Limit this to 99 attempts just to // avoid the possibility of an infinite loop. for _ in 0..99 { let mut alt_main = FnOnceQueue::new(); self.core.deferrer.swap_queue(&mut alt_main); if alt_main.is_empty() { break; } drop(alt_main); } } } /// Core operations available from both [`Stakker`] and [`Cx`] objects /// /// Both [`Stakker`] and [`Cx`] references auto-dereference to a /// [`Core`] reference, so typically either of those can be used /// wherever a [`Core`] reference is required. /// /// [`Core`]: struct.Core.html /// [`Cx`]: struct.Cx.html /// [`Stakker`]: struct.Stakker.html pub struct Core { now: Instant, pub(crate) deferrer: Deferrer, lazy_queue: FnOnceQueue<Stakker>, idle_queue: VecDeque<Box<dyn FnOnce(&mut Stakker) + 'static>>, timers: Timers<Stakker>, shutdown: Option<StopCause>, pub(crate) sharecell_owner: ShareCellOwner, pub(crate) actor_maker: ActorCellMaker, #[cfg(feature = "anymap")] anymap: anymap::Map, systime: Option<SystemTime>, wake_handlers: WakeHandlers, wake_handlers_unset: bool, #[cfg(feature = "logger")] log_id_seq: u64, #[cfg(feature = "logger")] log_filter: LogFilter, #[cfg(feature = "logger")] logger: Option<Box<dyn FnMut(&mut Core, &LogRecord<'_>)>>, } impl Core { pub(crate) fn new(now: Instant, actor_maker: ActorCellMaker) -> Self { let deferrer = Deferrer::new(); // Intentionally drop any queued items belonging to a previous // Stakker. This is to align the behaviour between the three // Deferrer implementations, to avoid dependence on any one // behaviour. (With the 'inline' Deferrer, each Stakker has // its own Deferrer queue, but the other two use a global or // thread-local which may have data from a previous Stakker.) deferrer.set_queue(FnOnceQueue::new()); Self { now, deferrer, lazy_queue: FnOnceQueue::new(), idle_queue: VecDeque::new(), timers: Timers::new(now), shutdown: None, sharecell_owner: new_share_cell_owner(), actor_maker, #[cfg(feature = "anymap")] anymap: anymap::Map::new(), systime: None, wake_handlers: WakeHandlers::new(Box::new(|| unreachable!())), wake_handlers_unset: true, #[cfg(feature = "logger")] log_id_seq: 0, #[cfg(feature = "logger")] log_filter: LogFilter::new(), #[cfg(feature = "logger")] logger: None, } } /// Our view of the current time. Actors should use this in /// preference to `Instant::now()` for speed and in order to work /// in virtual time. #[inline] pub fn now(&self) -> Instant { self.now } /// Get the current `SystemTime`. Normally this returns the same /// as `SystemTime::now()`, but if running in virtual time, it /// would return the virtual `SystemTime` instead (as provided to /// [`Stakker::set_systime`] by the virtual time main loop). Note /// that this time is not suitable for timing things, as it may go /// backwards if the user or a system process adjusts the clock. /// It is just useful for showing or recording "human time" for /// the user, and for recording times that are meaningful on a /// longer scale, e.g. from one run of a process to the next. /// /// [`Stakker::set_systime`]: struct.Stakker.html#method.set_systime #[inline] pub fn systime(&self) -> SystemTime { if let Some(time) = self.systime { time } else { SystemTime::now() } } /// Defer an operation to be executed later. It is put on the /// main queue, and run as soon all operations preceding it have /// been executed. See also the [`call!`] macro. /// /// [`call!`]: macro.call.html #[inline] pub fn defer(&mut self, f: impl FnOnce(&mut Stakker) + 'static) { self.deferrer.defer(f); } /// Defer an operation to executed soon, but lazily. It goes onto /// a lower priority queue executed once the normal defer queue /// has been completely cleared (including any further deferred /// items added whilst clearing that queue). This can be used for /// flushing data generated in this batch of processing, for /// example. See also the [`lazy!`] macro. /// /// [`lazy!`]: macro.lazy.html #[inline] pub fn lazy(&mut self, f: impl FnOnce(&mut Stakker) + 'static) { self.lazy_queue.push(f); } /// Defer an operation to be executed when this process next /// becomes idle, i.e. when all other queues are empty and there /// is no I/O to process. This can be used to implement /// backpressure on incoming streams, i.e. only fetch more data /// once there is nothing else left to do. See also the [`idle!`] /// macro. /// /// [`idle!`]: macro.idle.html #[inline] pub fn idle(&mut self, f: impl FnOnce(&mut Stakker) + 'static) { self.idle_queue.push_back(Box::new(f)); } /// Delay an operation to be executed after a duration has passed. /// This is the same as adding it as a fixed timer. Returns a key /// that can be used to delete the timer. See also the [`after!`] /// macro. /// /// [`after!`]: macro.after.html #[inline] pub fn after( &mut self, dur: Duration, f: impl FnOnce(&mut Stakker) + 'static, ) -> FixedTimerKey { self.timers.add(self.now + dur, Box::new(f)) } /// Add a fixed timer that expires at the given time. Returns a /// key that can be used to delete the timer. See also the /// [`at!`] macro. /// /// [`at!`]: macro.at.html #[inline] pub fn timer_add( &mut self, expiry: Instant, f: impl FnOnce(&mut Stakker) + 'static, ) -> FixedTimerKey { self.timers.add(expiry, Box::new(f)) } /// Delete a fixed timer. Returns `true` on success, `false` if /// timer no longer exists (i.e. it expired or was deleted) #[inline] pub fn timer_del(&mut self, key: FixedTimerKey) -> bool { self.timers.del(key) } /// Add a "Max" timer, which expires at the greatest (latest) /// expiry time provided. See [`MaxTimerKey`] for the /// characteristics of this timer. Returns a key that can be used /// to delete or modify the timer. /// /// See also the [`timer_max!`] macro, which may be more /// convenient as it combines [`Core::timer_max_add`] and /// [`Core::timer_max_upd`]. /// /// [`Core::timer_max_add`]: struct.Core.html#method.timer_max_add /// [`Core::timer_max_upd`]: struct.Core.html#method.timer_max_upd /// [`MaxTimerKey`]: struct.MaxTimerKey.html /// [`timer_max!`]: macro.timer_max.html #[inline] pub fn timer_max_add( &mut self, expiry: Instant, f: impl FnOnce(&mut Stakker) + 'static, ) -> MaxTimerKey { self.timers.add_max(expiry, Box::new(f)) } /// Update a "Max" timer with a new expiry time. It will be used /// as the new expiry time only if it is greater than the current /// expiry time. This call is designed to be very cheap to call /// frequently. /// /// Returns `true` on success, `false` if timer no longer exists /// (i.e. it expired or was deleted) /// /// See also the [`timer_max!`] macro, which may be more /// convenient as it combines [`Core::timer_max_add`] and /// [`Core::timer_max_upd`]. /// /// [`Core::timer_max_add`]: struct.Core.html#method.timer_max_add /// [`Core::timer_max_upd`]: struct.Core.html#method.timer_max_upd /// [`timer_max!`]: macro.timer_max.html #[inline] pub fn timer_max_upd(&mut self, key: MaxTimerKey, expiry: Instant) -> bool { self.timers.mod_max(key, expiry) } /// Delete a "Max" timer. Returns `true` on success, `false` if /// timer no longer exists (i.e. it expired or was deleted) #[inline] pub fn timer_max_del(&mut self, key: MaxTimerKey) -> bool { self.timers.del_max(key) } /// Check whether a "Max" timer is active. Returns `true` if it /// exists and is active, `false` if it expired or was deleted or /// never existed #[inline] pub fn timer_max_active(&mut self, key: MaxTimerKey) -> bool { self.timers.max_is_active(key) } /// Add a "Min" timer, which expires at the smallest (earliest) /// expiry time provided. See [`MinTimerKey`] for the /// characteristics of this timer. Returns a key that can be used /// to delete or modify the timer. /// /// See also the [`timer_min!`] macro, which may be more /// convenient as it combines [`Core::timer_min_add`] and /// [`Core::timer_min_upd`]. /// /// [`Core::timer_min_add`]: struct.Core.html#method.timer_min_add /// [`Core::timer_min_upd`]: struct.Core.html#method.timer_min_upd /// [`MinTimerKey`]: struct.MinTimerKey.html /// [`timer_min!`]: macro.timer_min.html #[inline] pub fn timer_min_add( &mut self, expiry: Instant, f: impl FnOnce(&mut Stakker) + 'static, ) -> MinTimerKey { self.timers.add_min(expiry, Box::new(f)) } /// Update a "Min" timer with a new expiry time. It will be used /// as the new expiry time only if it is earlier than the current /// expiry time. This call is designed to be very cheap to call /// frequently, so long as the change is within the wiggle-room /// allowed. Otherwise it causes the working timer to be deleted /// and added again, readjusting the wiggle-room accordingly. /// /// Returns `true` on success, `false` if timer no longer exists /// (i.e. it expired or was deleted) /// /// See also the [`timer_min!`] macro, which may be more /// convenient as it combines [`Core::timer_min_add`] and /// [`Core::timer_min_upd`]. /// /// [`Core::timer_min_add`]: struct.Core.html#method.timer_min_add /// [`Core::timer_min_upd`]: struct.Core.html#method.timer_min_upd /// [`timer_min!`]: macro.timer_min.html #[inline] pub fn timer_min_upd(&mut self, key: MinTimerKey, expiry: Instant) -> bool { self.timers.mod_min(key, expiry) } /// Delete a "Min" timer. Returns `true` on success, `false` if /// timer no longer exists (i.e. it expired or was deleted) #[inline] pub fn timer_min_del(&mut self, key: MinTimerKey) -> bool { self.timers.del_min(key) } /// Check whether a "Min" timer is active. Returns `true` if it /// exists and is active, `false` if it expired or was deleted or /// never existed #[inline] pub fn timer_min_active(&mut self, key: MinTimerKey) -> bool { self.timers.min_is_active(key) } /// Put a value into the `anymap`. This can be accessed using the /// [`Core::anymap_get`] or [`Core::anymap_try_get`] call. An /// anymap can store one value for each type (see crate /// [`anymap`](https://docs.rs/anymap)). The value must implement /// `Clone`, i.e. it must act something like an `Rc` or else be /// copyable data. /// /// This is intended to be used for storing certain global /// instances which actors may need to get hold of, for example an /// access-point for the I/O poll implementation that Stakker is /// running under. In other words the `anymap` is intended to /// represent the environment. /// /// There's nothing I can do to stop you using this like an /// inefficient global variable store, but doing that would be a /// terrible idea. Using the `anymap` that way breaks the actor /// model and makes your code harder to reason about. Really it /// would be cleaner to use a [`Share`] if you need to break the /// actor model and share data, because at least then the /// interconnection between actors would be explicit, and trying /// to move an interconnected actor to a remote machine would fail /// immediately. /// /// [`Core::anymap_get`]: struct.Core.html#method.anymap_get /// [`Core::anymap_try_get`]: struct.Core.html#method.anymap_try_get /// [`Share`]: struct.Share.html #[cfg_attr(not(feature = "anymap"), allow(unused_variables))] #[inline] pub fn anymap_set<T: Clone + 'static>(&mut self, val: T) { // If "anymap" feature is not configured, ignore this // operation, but panic on the `anymap_get` #[cfg(feature = "anymap")] self.anymap.insert(val); } /// Gets a clone of a value from the Stakker `anymap`. This is /// intended to be used to access certain global instances, for /// example the I/O poll implementation that this Stakker is /// running inside. Panics if the value is not found. pub fn anymap_get<T: Clone + 'static>(&mut self) -> T { self.anymap_try_get() .unwrap_or_else(|| panic!("No anymap entry found for {}", std::any::type_name::<T>())) } /// Tries to get a clone of a value from the Stakker `anymap`. /// This is intended to be used to access certain global /// instances, for example the I/O poll implementation that this /// Stakker is running inside. Returns `None` if the value is /// missing. pub fn anymap_try_get<T: Clone + 'static>(&mut self) -> Option<T> { #[cfg(feature = "anymap")] return self.anymap.get::<T>().cloned(); #[cfg(not(feature = "anymap"))] panic!("Enable feature 'anymap' to use anymap_get() or anymap_try_get()"); } /// Request that the event loop terminate. For this to work, the /// event loop must check [`Core::not_shutdown`] each time through /// the loop. See also the [`ret_shutdown!`] macro which can be /// used as the [`StopCause`] handler for an actor, to shut down /// the event loop when that actor terminates. The event loop /// code can obtain the [`StopCause`] using /// [`Core::shutdown_reason`]. /// /// [`Core::not_shutdown`]: struct.Core.html#method.not_shutdown /// [`Core::shutdown_reason`]: struct.Core.html#method.shutdown_reason /// [`StopCause`]: enum.StopCause.html /// [`ret_shutdown!`]: macro.ret_shutdown.html pub fn shutdown(&mut self, cause: StopCause) { self.shutdown = Some(cause); } /// Should the event loop continue running? Returns `true` if /// there is no active shutdown in progress. pub fn not_shutdown(&self) -> bool { self.shutdown.is_none() } /// Get the reason for shutdown, if shutdown was requested. After /// calling this, the shutdown flag is cleared, /// i.e. [`Core::not_shutdown`] will return `true` and the event /// loop could continue to run. /// /// [`Core::not_shutdown`]: struct.Core.html#method.not_shutdown pub fn shutdown_reason(&mut self) -> Option<StopCause> { mem::replace(&mut self.shutdown, None) } /// Get a new [`Deferrer`] instance which can be used to defer /// calls to the main queue from contexts in the same thread which /// don't have access to [`Core`], for example drop handlers. /// /// [`Core`]: struct.Core.html /// [`Deferrer`]: struct.Deferrer.html pub fn deferrer(&self) -> Deferrer { assert!(mem::size_of::<usize>() >= mem::size_of::<Deferrer>()); self.deferrer.clone() } /// Register a wake handler callback, and obtain a [`Waker`] /// instance which can be passed to another thread. The wake /// handler will always be executed in the main thread. When /// [`Waker::wake`] is called in another thread, a wake-up /// is scheduled to occur in the main thread, using the wake-up /// mechanism provided by the I/O poller. Then when that wake-up /// is received, the corresponding wake handler is executed. Note /// that this is efficient -- if many wake handlers are scheduled /// around the same time, they share the same main thread wake-up. /// /// The wake handler is called in the main thread with arguments /// of `(stakker, deleted)`. Note that there is a small chance of /// a spurious wake call happening occasionally, so the wake /// handler code must be ready for that. If `deleted` is true /// then the [`Waker`] was dropped, and this wake handler is /// also just about to be dropped. /// /// This call panics if no I/O poller has yet set up a waker using /// [`Stakker::set_poll_waker`]. /// /// [`Stakker::set_poll_waker`]: struct.Stakker.html#method.set_poll_waker /// [`Waker::wake`]: struct.Waker.html#method.wake /// [`Waker`]: struct.Waker.html pub fn waker(&mut self, cb: impl FnMut(&mut Stakker, bool) + 'static) -> Waker { if self.wake_handlers_unset { panic!("Core::waker() called with no waker set up"); } self.wake_handlers.add(cb) } /// Borrow two [`Share`] instances mutably at the same time. This /// will panic if they are the same instance. /// /// [`Share`]: struct.Share.html #[inline] pub fn share_rw2<'a, T, U>( &'a mut self, s1: &'a Share<T>, s2: &'a Share<U>, ) -> (&'a mut T, &'a mut U) { self.sharecell_owner.rw2(&s1.rc, &s2.rc) } /// Borrow three [`Share`] instances mutably at the same time. /// This will panic if any two are the same instance. /// /// [`Share`]: struct.Share.html #[inline] pub fn share_rw3<'a, T, U, V>( &'a mut self, s1: &'a Share<T>, s2: &'a Share<U>, s3: &'a Share<V>, ) -> (&'a mut T, &'a mut U, &'a mut V) { self.sharecell_owner.rw3(&s1.rc, &s2.rc, &s3.rc) } /// Pass a log-record to the current logger, if one is active and /// if the log-level is enabled. Otherwise the call is ignored. /// `id` should be the logging-ID (obtained from `actor.id()` or /// `cx.id()`, or `core.log_span_open()` for non-actor spans) or 0 /// if the log-record doesn't belong to any span. The arguments /// are used to form the [`LogRecord`] that is passed to the /// logger. /// /// Normally you would use a logging macro which wraps this call, /// which would be provided by an external crate such as /// `stakker_log`. /// /// This call does nothing unless the **logger** feature is /// enabled. /// /// [`LogRecord`]: struct.LogRecord.html #[inline] #[allow(unused_variables)] pub fn log( &mut self, id: LogID, level: LogLevel, target: &str, fmt: Arguments<'_>, kvscan: impl Fn(&mut dyn LogVisitor), ) { // It might seem like it would be better to build the // LogRecord first and then pass its address to this call, but // actually that doesn't work because `format_args!` creates // some temporaries that only live to the end of the enclosing // statement. So `format_args!` can only be used within the // argument list of the function that consumes those // arguments. However making this #[inline] means that the // LogRecord will be built in place on the stack and then its // address will be passed to the logger. #[cfg(feature = "logger")] if self.log_check(level) { if let Some(mut logger) = mem::replace(&mut self.logger, None) { logger( self, &LogRecord { id, level, target, fmt, kvscan: &kvscan, }, ); self.logger = Some(logger); } } } /// Check whether a log-record with the given [`LogLevel`] should /// be logged /// /// [`LogLevel`]: enum.LogLevel.html #[inline] #[allow(unused_variables)] pub fn log_check(&self, level: LogLevel) -> bool { #[cfg(feature = "logger")] { self.log_filter.allows(level) } #[cfg(not(feature = "logger"))] false } /// Allocate a new logging-ID and write a [`LogLevel::Open`] /// record to the logger. `tag` will be included as the record's /// text, and should indicate what kind of span it is, e.g. the /// type name for an actor. The tag would not normally contain /// any dynamic information. If `parent_id` is non-zero, then a /// `parent` key will be added with that value. `kvscan` will be /// called to add any other key-value pairs as required, which is /// where the dynamic information should go. /// /// This is used by actors on startup to allocate a logging-ID for /// the span of the actor's lifetime. However other code that /// needs to do logging within a certain identifiable span can /// also make use of this call. The new span should be related to /// another span using `parent_id` (if possible), and /// [`Core::log_span_close`] should be called when the span is /// complete. /// /// In the unlikely event that a program allocates 2^64 logging /// IDs, the IDs will wrap around to 1 again. If this is likely /// to cause a problem downstream, the logger implementation /// should detect this and warn or terminate as appropriate. /// /// This call does nothing unless the **logger** feature is /// enabled. /// /// [`Core::log_span_close`]: struct.Core.html#method.log_span_close /// [`LogLevel::Open`]: enum.LogLevel.html#variant.Open #[inline] #[allow(unused_variables)] pub fn log_span_open( &mut self, tag: &str, parent_id: LogID, kvscan: impl Fn(&mut dyn LogVisitor), ) -> LogID { #[cfg(feature = "logger")] { self.log_id_seq = self.log_id_seq.wrapping_add(1).max(1); let id = self.log_id_seq; self.log( id, LogLevel::Open, "", format_args!("{}", tag), move |output| { if parent_id != 0 { output.kv_u64(Some("parent"), parent_id); } kvscan(output); }, ); id } #[cfg(not(feature = "logger"))] 0 } /// Write a [`LogLevel::Close`] record to the logger. `fmt` is a /// message which may give more information, e.g. the error /// message in the case of a failure. `kvscan` will be called to /// add key-value pairs to the record. /// /// This call does nothing unless the **logger** feature is /// enabled. /// /// [`LogLevel::Close`]: enum.LogLevel.html#variant.Close #[inline] #[allow(unused_variables)] pub fn log_span_close( &mut self, id: LogID, fmt: Arguments<'_>, kvscan: impl Fn(&mut dyn LogVisitor), ) { #[cfg(feature = "logger")] self.log(id, LogLevel::Close, "", fmt, kvscan); } /// Used in macros to get a [`Core`] reference /// /// [`Core`]: struct.Core.html #[inline] pub fn access_core(&mut self) -> &mut Core { self } /// Used in macros to get a [`Deferrer`] reference /// /// [`Deferrer`]: struct.Deferrer.html #[inline] pub fn access_deferrer(&self) -> &Deferrer { &self.deferrer } /// Used in macros to get the [`LogID`] in case this is an actor /// or context. Since it isn't, this call returns 0. /// /// [`LogID`]: type.LogID.html #[inline] pub fn access_log_id(&self) -> LogID { 0 } }