Struct stakker::Stakker

source ·
pub struct Stakker { /* private fields */ }
Expand description

The external interface to the actor runtime

This contains all the queues and timers, and controls access to the state of all the actors. It also provides the interface to control all this from outside the runtime, i.e. it provides the calls used by an event loop. The Stakker instance itself is not accessible from actors due to borrowing restrictions. It derefs to a Core reference through auto-deref or *stakker.

Implementations§

source§

impl Stakker

source

pub fn new(now: Instant) -> Self

Construct a Stakker instance. Whether more than one instance can be created in each process or thread depends on the Cargo features enabled.

Examples found in repository?
examples/actor_own_anon.rs (line 34)
33
34
35
36
37
38
39
40
41
42
43
44
fn main() {
    let mut stakker = Stakker::new(Instant::now());
    let s = &mut stakker;

    let cat = actor!(s, Cat::init(), ret_nop!());
    call_and_drop(fwd_to!([cat], sound() as ()), cat.anon());

    let dog = actor!(s, Dog::init(), ret_nop!());
    call_and_drop(fwd_to!([dog], sound() as ()), dog.anon());

    s.run(Instant::now(), false);
}
More examples
Hide additional examples
examples/tutorial.rs (line 96)
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
fn main() {
    // Contains all the queues and timers, and controls access to the
    // state of all the actors.
    let mut stakker0 = Stakker::new(Instant::now());
    let stakker = &mut stakker0;

    // Create and initialise the Light and Flasher actors.  The
    // Flasher actor is given a reference to the Light.  Use a
    // StopCause handler to shutdown when the Flasher terminates.
    let light = actor!(stakker, Light::init(), ret_nop!());

    let _flasher = actor!(
        stakker,
        Flasher::init(light.clone(), Duration::from_secs(1), 6),
        ret_shutdown!(stakker)
    );

    // Since we're not in virtual time, we use `Instant::now()` in
    // this loop, which is then passed on to all the actors as
    // `cx.now()`.  (If you want to run time faster or slower you
    // could use another source of time.)  So all calls in a batch of
    // processing get the same `cx.now()` value.  Also note that
    // `Instant::now()` uses a Mutex on some platforms so it saves
    // cycles to call it less often.
    stakker.run(Instant::now(), false);
    while stakker.not_shutdown() {
        // Wait for next timer to expire.  Here there's no I/O polling
        // required to wait for external events, so just `sleep`
        let maxdur = stakker.next_wait_max(Instant::now(), Duration::from_secs(60), false);
        std::thread::sleep(maxdur);

        // Run queue and timers
        stakker.run(Instant::now(), false);
    }
}
source

pub fn next_expiry(&mut self) -> Option<Instant>

Return the next timer expiry time, or None

source

pub fn next_wait(&mut self, now: Instant) -> Option<Duration>

Return how long we need to wait for the next timer, or None if there are no timers to wait for

source

pub fn next_wait_max( &mut self, now: Instant, maxdur: Duration, idle_pending: bool ) -> Duration

Return how long to wait for the next I/O poll. If there are idle items queued (idle_pending is true), return 0 seconds, which allows the caller to quickly check for more I/O and then run the idle queue if there is nothing to do. If there is a timer active, return the time to wait for that timer, limited by maxdur. If there is nothing to wait for, just return maxdur.

Examples found in repository?
examples/tutorial.rs (line 121)
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
fn main() {
    // Contains all the queues and timers, and controls access to the
    // state of all the actors.
    let mut stakker0 = Stakker::new(Instant::now());
    let stakker = &mut stakker0;

    // Create and initialise the Light and Flasher actors.  The
    // Flasher actor is given a reference to the Light.  Use a
    // StopCause handler to shutdown when the Flasher terminates.
    let light = actor!(stakker, Light::init(), ret_nop!());

    let _flasher = actor!(
        stakker,
        Flasher::init(light.clone(), Duration::from_secs(1), 6),
        ret_shutdown!(stakker)
    );

    // Since we're not in virtual time, we use `Instant::now()` in
    // this loop, which is then passed on to all the actors as
    // `cx.now()`.  (If you want to run time faster or slower you
    // could use another source of time.)  So all calls in a batch of
    // processing get the same `cx.now()` value.  Also note that
    // `Instant::now()` uses a Mutex on some platforms so it saves
    // cycles to call it less often.
    stakker.run(Instant::now(), false);
    while stakker.not_shutdown() {
        // Wait for next timer to expire.  Here there's no I/O polling
        // required to wait for external events, so just `sleep`
        let maxdur = stakker.next_wait_max(Instant::now(), Duration::from_secs(60), false);
        std::thread::sleep(maxdur);

        // Run queue and timers
        stakker.run(Instant::now(), false);
    }
}
source

pub fn run(&mut self, now: Instant, idle: bool) -> bool

Move time forward, expire any timers onto the main Deferrer queue, then run main and lazy queues until there is nothing outstanding. Returns true if there are idle items still to run.

If idle is true, then runs an item from the idle queue as well. This should be set only if we’ve already run the queues and just polled I/O (without waiting) and still there’s nothing to do.

Note: All actors should use cx.now() to get the time, which allows the entire system to be run in virtual time (unrelated to real time) if necessary.

Examples found in repository?
examples/actor_own_anon.rs (line 43)
33
34
35
36
37
38
39
40
41
42
43
44
fn main() {
    let mut stakker = Stakker::new(Instant::now());
    let s = &mut stakker;

    let cat = actor!(s, Cat::init(), ret_nop!());
    call_and_drop(fwd_to!([cat], sound() as ()), cat.anon());

    let dog = actor!(s, Dog::init(), ret_nop!());
    call_and_drop(fwd_to!([dog], sound() as ()), dog.anon());

    s.run(Instant::now(), false);
}
More examples
Hide additional examples
examples/tutorial.rs (line 117)
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
fn main() {
    // Contains all the queues and timers, and controls access to the
    // state of all the actors.
    let mut stakker0 = Stakker::new(Instant::now());
    let stakker = &mut stakker0;

    // Create and initialise the Light and Flasher actors.  The
    // Flasher actor is given a reference to the Light.  Use a
    // StopCause handler to shutdown when the Flasher terminates.
    let light = actor!(stakker, Light::init(), ret_nop!());

    let _flasher = actor!(
        stakker,
        Flasher::init(light.clone(), Duration::from_secs(1), 6),
        ret_shutdown!(stakker)
    );

    // Since we're not in virtual time, we use `Instant::now()` in
    // this loop, which is then passed on to all the actors as
    // `cx.now()`.  (If you want to run time faster or slower you
    // could use another source of time.)  So all calls in a batch of
    // processing get the same `cx.now()` value.  Also note that
    // `Instant::now()` uses a Mutex on some platforms so it saves
    // cycles to call it less often.
    stakker.run(Instant::now(), false);
    while stakker.not_shutdown() {
        // Wait for next timer to expire.  Here there's no I/O polling
        // required to wait for external events, so just `sleep`
        let maxdur = stakker.next_wait_max(Instant::now(), Duration::from_secs(60), false);
        std::thread::sleep(maxdur);

        // Run queue and timers
        stakker.run(Instant::now(), false);
    }
}
source

pub fn set_systime(&mut self, systime: Option<SystemTime>)

Set the current SystemTime, for use in a virtual time main loop. If None is passed, then core.systime() just calls SystemTime::now(). Otherwise core.systime() returns the provided SystemTime instead.

source

pub fn set_logger( &mut self, filter: LogFilter, logger: impl FnMut(&mut Core, &LogRecord<'_>) + 'static )

Set the logger and logging level

The provided logger will be called synchronously every time a Core::log call is made if the logging level is enabled. It is provided with a Core reference, so can access a Share, or defer calls to actors as necessary. It may alternatively choose to forward the logging to an external log framework, such as the log or tracing crates.

The enabled logging levels are described by filter. Typically you’d set something like LogFilter::all(&[LogLevel::Info, LogLevel::Audit, LogLevel::Open]) or LogFilter::from_str("info,audit,open"), which enable info and above, plus audit and span open/close. See LogFilter.

Note that the logger feature must be enabled for this call to succeed. The Stakker crate provides only the core logging functionality. It adds a 64-bit logging ID to each actor and logs actor startup and termination. It provides the framework for logging formatted-text and key-value pairs along with an actor’s logging-ID for context. An external crate like stakker_log may be used to provide macros that allow convenient logging from actor code and to allow interfacing to external logging systems.

source

pub fn set_poll_waker(&mut self, waker: impl Fn() + Send + Sync + 'static)

Used to provide Stakker with a means to wake the main thread from another thread. This enables Waker and associated functionality. A poll-waker is not required otherwise.

Normally the main thread will be blocked waiting for I/O events most of the time, with a timeout to handle the next-expiring timer. If Stakker code running in another thread wants to defer a call to the main thread, then it needs a way to interrupt that blocked call. This is done via creating an artificial I/O event. (For example, mio handles this with a mio::Waker instance which wraps various platform-specific ways of creating an artificial I/O event.)

So Stakker calls the waker provided to this call, which causes the I/O polling implementation to trigger an artificial I/O event, which results in the I/O polling implementation calling Stakker::poll_wake().

Normally the poll-waker will be set up automatically by the user’s chosen I/O polling implementation (for example stakker_mio) on initialisation.

source

pub fn poll_wake(&mut self)

Indicate to Stakker that the main thread has been woken up due to a call from another thread to the waker configured with set_poll_waker. The I/O polling implementation (e.g. stakker_mio) makes this call to let Stakker know that it should do its Waker handling.

Methods from Deref<Target = Core>§

source

pub fn now(&self) -> Instant

Our view of the current time. Actors should use this in preference to Instant::now() for speed and in order to work in virtual time.

Examples found in repository?
examples/tutorial.rs (line 27)
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
    pub fn init(cx: CX![]) -> Option<Self> {
        // Use cx.now() instead of Instant::now() to allow execution
        // in virtual time if supported by the environment.
        let start = cx.now();
        Some(Self { start, on: false })
    }

    // Methods that may be called once the actor is "Ready" have a
    // `&mut self` or `&self` first argument.
    pub fn set(&mut self, cx: CX![], on: bool) {
        self.on = on;
        let time = cx.now() - self.start;
        println!(
            "{:04}.{:03} Light on: {}",
            time.as_secs(),
            time.subsec_millis(),
            on
        );
    }
source

pub fn systime(&self) -> SystemTime

Get the current SystemTime. Normally this returns the same as SystemTime::now(), but if running in virtual time, it would return the virtual SystemTime instead (as provided to Stakker::set_systime by the virtual time main loop). Note that this time is not suitable for timing things, as it may go backwards if the user or a system process adjusts the clock. It is just useful for showing or recording “human time” for the user, and for recording times that are meaningful on a longer scale, e.g. from one run of a process to the next.

source

pub fn start_instant(&self) -> Instant

Get the Instant which was passed to Stakker::new when this runtime was started.

source

pub fn defer(&mut self, f: impl FnOnce(&mut Stakker) + 'static)

Defer an operation to be executed later. It is put on the main queue, and run as soon all operations preceding it have been executed. See also the call! macro.

source

pub fn lazy(&mut self, f: impl FnOnce(&mut Stakker) + 'static)

Defer an operation to executed soon, but lazily. It goes onto a lower priority queue executed once the normal defer queue has been completely cleared (including any further deferred items added whilst clearing that queue). This can be used for flushing data generated in this batch of processing, for example. See also the lazy! macro.

source

pub fn idle(&mut self, f: impl FnOnce(&mut Stakker) + 'static)

Defer an operation to be executed when this process next becomes idle, i.e. when all other queues are empty and there is no I/O to process. This can be used to implement backpressure on incoming streams, i.e. only fetch more data once there is nothing else left to do. See also the idle! macro.

source

pub fn after( &mut self, dur: Duration, f: impl FnOnce(&mut Stakker) + 'static ) -> FixedTimerKey

Delay an operation to be executed after a duration has passed. This is the same as adding it as a fixed timer. Returns a key that can be used to delete the timer. See also the after! macro.

source

pub fn timer_add( &mut self, expiry: Instant, f: impl FnOnce(&mut Stakker) + 'static ) -> FixedTimerKey

Add a fixed timer that expires at the given time. Returns a key that can be used to delete the timer. See also the at! macro.

source

pub fn timer_del(&mut self, key: FixedTimerKey) -> bool

Delete a fixed timer. Returns true on success, false if timer no longer exists (i.e. it expired or was deleted)

source

pub fn timer_max_add( &mut self, expiry: Instant, f: impl FnOnce(&mut Stakker) + 'static ) -> MaxTimerKey

Add a “Max” timer, which expires at the greatest (latest) expiry time provided. See MaxTimerKey for the characteristics of this timer. Returns a key that can be used to delete or modify the timer.

See also the timer_max! macro, which may be more convenient as it combines Core::timer_max_add and Core::timer_max_upd.

source

pub fn timer_max_upd(&mut self, key: MaxTimerKey, expiry: Instant) -> bool

Update a “Max” timer with a new expiry time. It will be used as the new expiry time only if it is greater than the current expiry time. This call is designed to be very cheap to call frequently.

Returns true on success, false if timer no longer exists (i.e. it expired or was deleted)

See also the timer_max! macro, which may be more convenient as it combines Core::timer_max_add and Core::timer_max_upd.

source

pub fn timer_max_del(&mut self, key: MaxTimerKey) -> bool

Delete a “Max” timer. Returns true on success, false if timer no longer exists (i.e. it expired or was deleted)

source

pub fn timer_max_active(&mut self, key: MaxTimerKey) -> bool

Check whether a “Max” timer is active. Returns true if it exists and is active, false if it expired or was deleted or never existed

source

pub fn timer_min_add( &mut self, expiry: Instant, f: impl FnOnce(&mut Stakker) + 'static ) -> MinTimerKey

Add a “Min” timer, which expires at the smallest (earliest) expiry time provided. See MinTimerKey for the characteristics of this timer. Returns a key that can be used to delete or modify the timer.

See also the timer_min! macro, which may be more convenient as it combines Core::timer_min_add and Core::timer_min_upd.

source

pub fn timer_min_upd(&mut self, key: MinTimerKey, expiry: Instant) -> bool

Update a “Min” timer with a new expiry time. It will be used as the new expiry time only if it is earlier than the current expiry time. This call is designed to be very cheap to call frequently, so long as the change is within the wiggle-room allowed. Otherwise it causes the working timer to be deleted and added again, readjusting the wiggle-room accordingly.

Returns true on success, false if timer no longer exists (i.e. it expired or was deleted)

See also the timer_min! macro, which may be more convenient as it combines Core::timer_min_add and Core::timer_min_upd.

source

pub fn timer_min_del(&mut self, key: MinTimerKey) -> bool

Delete a “Min” timer. Returns true on success, false if timer no longer exists (i.e. it expired or was deleted)

source

pub fn timer_min_active(&mut self, key: MinTimerKey) -> bool

Check whether a “Min” timer is active. Returns true if it exists and is active, false if it expired or was deleted or never existed

source

pub fn anymap_set<T: Clone + 'static>(&mut self, val: T)

Put a value into the anymap. This can be accessed using the Core::anymap_get or Core::anymap_try_get call. An anymap can store one value for each Rust type. The value must implement Clone, i.e. it must act something like an Rc or else be copyable data.

This is intended to be used for storing certain global instances which actors may need to get hold of, for example an access-point for the I/O poll implementation that Stakker is running under. In other words the anymap is intended to represent the environment.

There’s nothing I can do to stop you using this like an inefficient global variable store, but doing that would be a terrible idea. Using the anymap that way breaks the actor model and makes your code harder to reason about. Really it would be cleaner to use a Share if you need to break the actor model and share data, because at least then the interconnection between actors would be explicit, and trying to move an interconnected actor to a remote machine would fail immediately.

source

pub fn anymap_get<T: Clone + 'static>(&mut self) -> T

Gets a clone of a value from the Stakker anymap. This is intended to be used to access certain global instances, for example the I/O poll implementation that this Stakker is running inside. Panics if the value is not found.

source

pub fn anymap_try_get<T: Clone + 'static>(&mut self) -> Option<T>

Tries to get a clone of a value from the Stakker anymap. This is intended to be used to access certain global instances, for example the I/O poll implementation that this Stakker is running inside. Returns None if the value is missing.

source

pub fn shutdown(&mut self, cause: StopCause)

Request that the event loop terminate. For this to work, the event loop must check Core::not_shutdown each time through the loop. See also the ret_shutdown! macro which can be used as the StopCause handler for an actor, to shut down the event loop when that actor terminates. The event loop code can obtain the StopCause using Core::shutdown_reason.

source

pub fn not_shutdown(&self) -> bool

Should the event loop continue running? Returns true if there is no active shutdown in progress.

Examples found in repository?
examples/tutorial.rs (line 118)
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
fn main() {
    // Contains all the queues and timers, and controls access to the
    // state of all the actors.
    let mut stakker0 = Stakker::new(Instant::now());
    let stakker = &mut stakker0;

    // Create and initialise the Light and Flasher actors.  The
    // Flasher actor is given a reference to the Light.  Use a
    // StopCause handler to shutdown when the Flasher terminates.
    let light = actor!(stakker, Light::init(), ret_nop!());

    let _flasher = actor!(
        stakker,
        Flasher::init(light.clone(), Duration::from_secs(1), 6),
        ret_shutdown!(stakker)
    );

    // Since we're not in virtual time, we use `Instant::now()` in
    // this loop, which is then passed on to all the actors as
    // `cx.now()`.  (If you want to run time faster or slower you
    // could use another source of time.)  So all calls in a batch of
    // processing get the same `cx.now()` value.  Also note that
    // `Instant::now()` uses a Mutex on some platforms so it saves
    // cycles to call it less often.
    stakker.run(Instant::now(), false);
    while stakker.not_shutdown() {
        // Wait for next timer to expire.  Here there's no I/O polling
        // required to wait for external events, so just `sleep`
        let maxdur = stakker.next_wait_max(Instant::now(), Duration::from_secs(60), false);
        std::thread::sleep(maxdur);

        // Run queue and timers
        stakker.run(Instant::now(), false);
    }
}
source

pub fn shutdown_reason(&mut self) -> Option<StopCause>

Get the reason for shutdown, if shutdown was requested. After calling this, the shutdown flag is cleared, i.e. Core::not_shutdown will return true and the event loop could continue to run.

source

pub fn deferrer(&self) -> Deferrer

Get a new Deferrer instance which can be used to defer calls to the main queue from contexts in the same thread which don’t have access to Core, for example drop handlers.

source

pub fn waker(&mut self, cb: impl FnMut(&mut Stakker, bool) + 'static) -> Waker

Register a wake handler callback, and obtain a Waker instance which can be passed to another thread. The wake handler will always be executed in the main thread. When Waker::wake is called in another thread, a wake-up is scheduled to occur in the main thread, using the wake-up mechanism provided by the I/O poller. Then when that wake-up is received, the corresponding wake handler is executed. Note that this is efficient – if many wake handlers are scheduled around the same time, they share the same main thread wake-up.

The wake handler is called in the main thread with arguments of (stakker, deleted). Note that there is a small chance of a spurious wake call happening occasionally, so the wake handler code must be ready for that. If deleted is true then the Waker was dropped, and this wake handler is also just about to be dropped.

This call panics if no I/O poller has yet set up a waker using Stakker::set_poll_waker.

source

pub fn share_rw2<'a, T, U>( &'a mut self, s1: &'a Share<T>, s2: &'a Share<U> ) -> (&'a mut T, &'a mut U)

Borrow two Share instances mutably at the same time. This will panic if they are the same instance.

source

pub fn share_rw3<'a, T, U, V>( &'a mut self, s1: &'a Share<T>, s2: &'a Share<U>, s3: &'a Share<V> ) -> (&'a mut T, &'a mut U, &'a mut V)

Borrow three Share instances mutably at the same time. This will panic if any two are the same instance.

source

pub fn log( &mut self, id: LogID, level: LogLevel, target: &str, fmt: Arguments<'_>, kvscan: impl Fn(&mut dyn LogVisitor) )

Pass a log-record to the current logger, if one is active and if the log-level is enabled. Otherwise the call is ignored. id should be the logging-ID (obtained from actor.id() or cx.id(), or core.log_span_open() for non-actor spans) or 0 if the log-record doesn’t belong to any span. The arguments are used to form the LogRecord that is passed to the logger.

Normally you would use a logging macro which wraps this call, which would be provided by an external crate such as stakker_log.

This call does nothing unless the logger feature is enabled.

source

pub fn log_check(&self, level: LogLevel) -> bool

Check whether a log-record with the given LogLevel should be logged

source

pub fn log_span_open( &mut self, tag: &str, parent_id: LogID, kvscan: impl Fn(&mut dyn LogVisitor) ) -> LogID

Allocate a new logging-ID and write a LogLevel::Open record to the logger. tag will be included as the record’s text, and should indicate what kind of span it is, e.g. the type name for an actor. The tag would not normally contain any dynamic information. If parent_id is non-zero, then a parent key will be added with that value. kvscan will be called to add any other key-value pairs as required, which is where the dynamic information should go.

This is used by actors on startup to allocate a logging-ID for the span of the actor’s lifetime. However other code that needs to do logging within a certain identifiable span can also make use of this call. The new span should be related to another span using parent_id (if possible), and Core::log_span_close should be called when the span is complete.

In the unlikely event that a program allocates 2^64 logging IDs, the IDs will wrap around to 1 again. If this is likely to cause a problem downstream, the logger implementation should detect this and warn or terminate as appropriate.

This call does nothing unless the logger feature is enabled.

source

pub fn log_span_close( &mut self, id: LogID, fmt: Arguments<'_>, kvscan: impl Fn(&mut dyn LogVisitor) )

Write a LogLevel::Close record to the logger. fmt is a message which may give more information, e.g. the error message in the case of a failure. kvscan will be called to add key-value pairs to the record.

This call does nothing unless the logger feature is enabled.

source

pub fn access_core(&mut self) -> &mut Core

Used in macros to get a Core reference

source

pub fn access_deferrer(&self) -> &Deferrer

Used in macros to get a Deferrer reference

source

pub fn access_log_id(&self) -> LogID

Used in macros to get the LogID in case this is an actor or context. Since it isn’t, this call returns 0.

Trait Implementations§

source§

impl Deref for Stakker

§

type Target = Core

The resulting type after dereferencing.
source§

fn deref(&self) -> &Core

Dereferences the value.
source§

impl DerefMut for Stakker

source§

fn deref_mut(&mut self) -> &mut Core

Mutably dereferences the value.
source§

impl Drop for Stakker

source§

fn drop(&mut self)

Executes the destructor for this type. Read more

Auto Trait Implementations§

§

impl !RefUnwindSafe for Stakker

§

impl !Send for Stakker

§

impl !Sync for Stakker

§

impl Unpin for Stakker

§

impl !UnwindSafe for Stakker

Blanket Implementations§

source§

impl<T> Any for Twhere T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for Twhere T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for Twhere T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for Twhere U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for Twhere U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for Twhere U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.