Module waitset

Module waitset 

Source
Expand description

Event handling mechanism to wait on multiple Listeners in one call, realizing the reactor pattern. (Event multiplexer) A WaitSet is an implementation of an event multiplexer (Reactor of the reactor design pattern). It allows the user to attach notifications, deadlines or intervals.

  • Notification - An object that emits an event. Whenever the event is detected the WaitSet wakes up and informs the user. Typical use case are gateways, which receives and forwards data whenever new data is available.
  • Deadline - Like a Notification with the exception that the Deadline expects an event after a certain predefined timeout. If the event does not arrive before the timeout has passed, the WaitSet wakes up and informs the user that th Deadline has missed its timeout. Whenever a Deadline receives an event, the timeout is reset. One example is a sensor that shall send an update every 100ms and the applications requires the sensor data latest after 120ms. If after 120ms an update is not available the application must wake up and take counter measures. If the update arrives within the timeout, the timeout is reset back to 120ms.
  • Interval - An time period after which the WaitSet wakes up and informs the user that the time has passed by. This is useful when a Publisher shall send an heartbeat every 100ms.

The WaitSet allows the user to attach multiple Listener from multiple Nodes, anything that implements SynchronousMultiplexing with timeouts (Deadline) or without them (Notification). Additional, an arbitrary amount of intervals (Ticks) can be attached.

§Example

§Notification

use iceoryx2::prelude::*;

let mut listener = event.listener_builder().create()?;

let waitset = WaitSetBuilder::new().create::<ipc::Service>()?;
let guard = waitset.attach_notification(&listener)?;

let on_event = |attachment_id: WaitSetAttachmentId<ipc::Service>| {
    if attachment_id.has_event_from(&guard) {
        while let Ok(Some(event_id)) = listener.try_wait_one() {
            println!("received notification {:?}", event_id);
        }
    }
    CallbackProgression::Continue
};

waitset.wait_and_process(on_event)?;

§Deadline

use iceoryx2::prelude::*;

let mut listener = event.listener_builder().create()?;

let listener_deadline = Duration::from_secs(1);
let waitset = WaitSetBuilder::new().create::<ipc::Service>()?;
let guard = waitset.attach_deadline(&listener, listener_deadline)?;

let on_event = |attachment_id: WaitSetAttachmentId<ipc::Service>| {
    if attachment_id.has_event_from(&guard) {
        while let Ok(Some(event_id)) = listener.try_wait_one() {
            println!("received notification {:?}", event_id);
        }
    } else if attachment_id.has_missed_deadline(&guard) {
        println!("Oh no, we hit the deadline without receiving any kind of event");
    }
    CallbackProgression::Continue
};

waitset.wait_and_process(on_event)?;

§Tick

use iceoryx2::prelude::*;

let publisher_1 = pubsub.publisher_builder().create()?;
let publisher_2 = pubsub.publisher_builder().create()?;

let pub_1_period = Duration::from_millis(250);
let pub_2_period = Duration::from_millis(718);

let waitset = WaitSetBuilder::new().create::<ipc::Service>()?;
let guard_1 = waitset.attach_interval(pub_1_period)?;
let guard_2 = waitset.attach_interval(pub_2_period)?;

let on_event = |attachment_id: WaitSetAttachmentId<ipc::Service>| {
    if attachment_id.has_event_from(&guard_1) {
        publisher_1.send_copy(123);
    } else if attachment_id.has_event_from(&guard_2) {
        publisher_2.send_copy(456);
    }
    CallbackProgression::Continue
};

waitset.wait_and_process(on_event)?;

§HashMap approach

use iceoryx2::prelude::*;
use std::collections::HashMap;
use iceoryx2::port::listener::Listener;

let listener_1 = event_1.listener_builder().create()?;
let listener_2 = event_2.listener_builder().create()?;

let mut listeners: HashMap<WaitSetAttachmentId<ipc::Service>, &Listener<ipc::Service>> = HashMap::new();
let waitset = WaitSetBuilder::new().create::<ipc::Service>()?;

// attach all listeners to the waitset
let guard_1 = waitset.attach_notification(&listener_1)?;
let guard_2 = waitset.attach_notification(&listener_2)?;
listeners.insert(WaitSetAttachmentId::from_guard(&guard_1), &listener_1);
listeners.insert(WaitSetAttachmentId::from_guard(&guard_2), &listener_2);

let on_event = |attachment_id| {
    if let Some(listener) = listeners.get(&attachment_id) {
        while let Ok(Some(event_id)) = listener.try_wait_one() {
            println!("received notification {:?}", event_id);
        }
    }
    CallbackProgression::Continue
};

waitset.wait_and_process(on_event)?;

§Using WaitSet Without Signal Handling

This example demonstrates how the WaitSet can be used when system signals are being handled elsewhere. The builder parameter WaitSetBuilder::signal_handling_mode() can be used to disable signal handling in all WaitSet calls like WaitSet::wait_and_process() or WaitSet::wait_and_process_once().

use iceoryx2::prelude::*;

let waitset = WaitSetBuilder::new()
                .signal_handling_mode(SignalHandlingMode::Disabled)
                .create::<ipc::Service>()?;

let on_event = |_| {
    // your event handling
    CallbackProgression::Continue
};

waitset.wait_and_process(on_event)?;

Structs§

WaitSet
The WaitSet implements a reactor pattern and allows to wait on multiple events in one single call WaitSet::wait_and_process_once() until it wakes up or to run repeatedly with WaitSet::wait_and_process() until the a interrupt or termination signal was received or the user has explicitly requested to stop by returning CallbackProgression::Stop in the provided callback.
WaitSetAttachmentId
Represents an attachment to the WaitSet
WaitSetBuilder
The builder for the WaitSet.
WaitSetGuard
Is returned when something is attached to the WaitSet. As soon as it goes out of scope, the attachment is detached.

Enums§

WaitSetAttachmentError
Defines the failures that can occur when attaching something with WaitSet::attach_notification(), WaitSet::attach_interval() or WaitSet::attach_deadline().
WaitSetCreateError
Defines the failures that can occur when calling WaitSetBuilder::create().
WaitSetRunError
Defines the failures that can occur when calling WaitSet::wait_and_process().
WaitSetRunResult
States why the WaitSet::wait_and_process() method returned.