Struct apocalypse::Gate

source ·
pub struct Gate { /* private fields */ }
Expand description

§Gate structure

The portal structure allows communication with the demons, as well as demon spawning.

Gates are the main component of this library. They hold a channel of communication with their hell instance. It is important to notice that hell will continue to exist as long as any of the two following things are true

  • There are messages still in the queue to be processed
  • There is at least one gate alive

That is, dropping all gates finalizes hell’s execution. Due to the fact that a gate is required to send messages, and some Demons will have a gate among their fields, you have to remove all Demons in posession of a Gate to shutdown Hell gracefully. This structure cannot be created without the help of a Hell instance.

Implementations§

source§

impl Gate

source

pub async fn send<A: AsRef<Location<D>>, D, I, O>( &self, location: A, message: I ) -> Result<O, Error>
where D: Demon<Input = I, Output = O>, I: 'static + Send, O: 'static + Send,

Sends a message to a demon

In this actor implementaton, all messages do have to return some kind of reply. Be aware that this decision can lead to lockups if used carelessly (as the mutable access that the handle function has to the demons blocks the message processing loop until each handle call ends). If you manage to create a message-cycle (that is, a chain of requests that has as element the same actor twice), then you will end up in a lockup situation. Try to use this function only when necessary, keep send_and_ignore as your first option, unless you carefully thought about the message-chains in your software.

use apocalypse::{Hell, Demon};

struct EchoBot;

impl Demon for EchoBot {
    type Input = &'static str;
    type Output = String;
    async fn handle(&mut self, message: Self::Input) -> Self::Output {
        message.to_string()
    }
}

let (gate, jh) = Hell::new().ignite().await.unwrap();
let location = gate.spawn(EchoBot).await.unwrap();
// Use the send function to send a message
let message = gate.send(&location, "Hallo, welt!").await.unwrap();
source

pub async fn send_and_ignore<D, I, O>( &self, location: &Location<D>, message: I ) -> Result<(), Error>
where D: Demon<Input = I, Output = O>, I: 'static + Send, O: 'static + Send,

Sends a message to a demon, and ignore the result.

This is your go-to function when you don’t have to wait for the actor to give you a response back. This function fails if the request could not be delivered to the demon. If you absolutely require to call this function without awaiting, use tokio::spawn.

use apocalypse::{Hell, Demon};

struct PrintBot;

impl Demon for PrintBot {
    type Input = &'static str;
    type Output = ();
    async fn handle(&mut self, message: Self::Input) {
        println!("{}", message);
    }
}

let (gate, jh) = Hell::new().ignite().await.unwrap();
let location = gate.spawn(PrintBot).await.unwrap();
// Use the send and ignore function to send a message without waiting for it
gate.send_and_ignore(&location, "Hallo, welt!").await.unwrap();
source

pub async fn spawn<D: 'static + Demon<Input = I, Output = O>, I: 'static + Send, O: 'static + Send>( &self, demon: D ) -> Result<Location<D>, Error>

Spawns a demon in hell

use apocalypse::{Hell, Demon};

struct Basic;

impl Demon for Basic {
    type Input = String;
    type Output = ();
    async fn handle(&mut self, message: Self::Input) -> Self::Output {
        println!("Hello, world!");
    }
}

let (gate, join_handle) = Hell::new().ignite().await.unwrap();
// we spawn the demon
let _location = gate.spawn(Basic).await.unwrap();
// Do something
source

pub async fn spawn_multiple<D: 'static + Demon<Input = I, Output = O>, I: 'static + Send, O: 'static + Send, F: FnMut() -> D>( &self, demon_factory: F, replicas: usize ) -> Result<Location<D>, Error>

Spawns multiple demons in Hell, that reply to the same Location

This might be useful if you have one task that consumes some time to be processed, and you can also parallelize. The load balancing method is just using whichever Demon is free at the moment, in a sequential order (that is, sequential but skipping if one is busy).

use apocalypse::{Hell, Demon};

struct Basic;

impl Demon for Basic {
    type Input = String;
    type Output = ();
    async fn handle(&mut self, message: Self::Input) -> Self::Output {
        println!("Hello, world!");
    }
}

let (gate, join_handle) = Hell::new().ignite().await.unwrap();
// Demon factory, needs to implement FnMut
let basic_factory = || {
    Basic
};
// We spawn three instances of the demon
let _location = gate.spawn_multiple(basic_factory, 3).await.unwrap();
// Do something
source

pub async fn spawn_ws<D: 'static + Demon<Input = I, Output = O> + WebSocketThread, I: 'static + Send, O: 'static + Send>( &self, demon: D, wsr: WebSocketReader ) -> Result<Location<D>, Error>

Spawns a demon with websockets processing in hell

Demons spawned with this method need to implement the WebSocketThread trait. Demons will process both messages incoming from apocalypse, as well as from the websockets connection. It is important to note that the websockets handshake is not at all performed by this library.

use apocalypse::{Hell, Demon};
use cataclysm::ws::{WebSocketThread, Message};

struct PrintBot;

impl Demon for PrintBot {
    type Input = ();
    type Output = ();
    async fn handle(&mut self, message: Self::Input) -> Self::Output {
        println!("Hello, world!");
    }
}
 
impl WebSocketThread for PrintBot {
    type Output = ();
    async fn on_message(&mut self, message: Message) {
        // ... do something with the message
    }

    async fn on_close(&mut self, _clean: bool) -> Self::Output {}
}

#[tokio::main]
async fn main() {
    let hell = Hell::new();
    let (gate, join_handle) = hell.ignite().await.unwrap();
    // In order to spawn, you should be able to obtain a
    // OwnedHalfRead tcp stream from tokio or similar
    // -> let _location = gate.spawn_ws(Basic{}, read_stream).await;
}
source

pub async fn vanquish<D: 'static + Demon<Input = I, Output = O>, I: 'static + Send, O: 'static + Send>( &self, location: &Location<D> ) -> Result<(), Error>

Get rid of one demon gracefully

With this method, you request one demon to be dropped. Notice that locations will not automatically reflect this change, and further messages sent to the dropped demon will return Error::InvalidLocation. This method with block until the demon confirms is no longer executing anything. There is no guarantee that all pending messages will be processed before termination.

If the hell instance has a default timeout for vanquishing demons, this function will return the latest at timeout. If you want to override this behaviour for a single call, see vanquish_with_timeout.

use apocalypse::{Hell, Demon};

struct EchoDemon{}

impl Demon for EchoDemon {
    type Input = &'static str;
    type Output = ();
    async fn handle(&mut self, message: Self::Input) -> Self::Output {
        println!("{}", message);
    }
}

#[tokio::main]
async fn main() {
    let hell = Hell::new();
    let join_handle = {
        // We ignite our hell instance in another span to guarantee our gate is dropped after use
        let (gate, join_handle) = hell.ignite().await.unwrap();
        // We spawn the echo demon
        let location = gate.spawn(EchoDemon{}).await.unwrap();
        // We wait until the demon fades away
        gate.vanquish(&location).await.unwrap();
        join_handle
    };
    // We await the system
    join_handle.await.unwrap();
}
source

pub async fn vanquish_with_timeout<D: 'static + Demon<Input = I, Output = O>, I: 'static + Send, O: 'static + Send>( &self, location: &Location<D>, timeout: Option<Duration> ) -> Result<(), Error>

Get rid of one demon gracefully

With this method, you request one demon to be dropped. Notice that locations will not automatically reflect this change, and further messages sent to the dropped demon will return Error::InvalidLocation. This method with block until the demon confirms is no longer executing anything. There is no guarantee that all pending messages will be processed before termination.

If the hell instance has a default timeout for vanquishing demons, this function will return the latest at timeout. If you want to override this behaviour for a single call, see vanquish_with_timeout.

use apocalypse::{Hell, Demon};
use std::time::Duration;

struct EchoDemon{}

impl Demon for EchoDemon {
    type Input = &'static str;
    type Output = ();
    async fn handle(&mut self, message: Self::Input) -> Self::Output {
        // The demon is slow, takes 2 seconds to reply
        tokio::time::sleep(Duration::from_secs(2)).await;
        println!("{}", message);
    }
}

#[tokio::main]
async fn main() {
    let hell = Hell::new();
    let join_handle = {
        // We ignite our hell instance in another span to guarantee our gate is dropped after use
        let (gate, join_handle) = hell.ignite().await.unwrap();
        // We spawn the echo demon
        let location = gate.spawn(EchoDemon{}).await.unwrap();
        // We wait maximum 1 second for the demon to finish whatever it is doing
        gate.vanquish_with_timeout(&location, Some(Duration::from_secs(1))).await.unwrap();
        join_handle
    };
    // We await the system
    join_handle.await.unwrap();
}
source

pub async fn vanquish_and_ignore<D: 'static + Demon<Input = I, Output = O>, I: 'static + Send, O: 'static + Send>( &self, location: &Location<D> ) -> Result<(), Error>

Get rid of one demon gracefully, and ignore the result

As with send and send_and_ignore, this method is prefered because there is a lower chance of a lockup happening. For example, if you were to allow your own demon to vanquish itself, you should use this method. The method fails if the vanquish request does not reach the demon. If you absolutely require to call this function without awaiting, use tokio::spawn.

If the hell instance has a default timeout for vanquishing demons, this function will return the latest at timeout. If you want to override this behaviour for a single call, see vanquish_and_ignore_with_timeout.

use apocalypse::{Hell, Demon};

struct EchoDemon{}

impl Demon for EchoDemon {
    type Input = &'static str;
    type Output = ();
    async fn handle(&mut self, message: Self::Input) -> Self::Output {
        println!("{}", message);
    }
}

#[tokio::main]
async fn main() {
    let hell = Hell::new();
    let join_handle = {
        // We ignite our hell instance in another span to guarantee our gate is dropped after use
        let (gate, join_handle) = hell.ignite().await.unwrap();
        // We spawn the echo demon
        let location = gate.spawn(EchoDemon{}).await.unwrap();
        // This function fails if the vanquish request does not reach the demon, but if
        // if it does, it does not block.
        gate.vanquish_and_ignore(&location).await.unwrap();
        join_handle
    };
    // We await the system
    join_handle.await.unwrap();
}
source

pub async fn vanquish_and_ignore_with_timeout<D: 'static + Demon<Input = I, Output = O>, I: 'static + Send, O: 'static + Send>( &self, location: &Location<D>, timeout: Option<Duration> ) -> Result<(), Error>

Get rid of one demon, and ignore the result.

As with send and send_and_ignore, this method is prefered because there is a lower chance of a lockup happening. For example, if you were to allow your own demon to vanquish itself, you should use this method. If you absolutely require to call this function without awaiting, use tokio::spawn.

use apocalypse::{Hell, Demon};
use std::time::Duration;

struct EchoDemon{}

impl Demon for EchoDemon {
    type Input = &'static str;
    type Output = ();
    async fn handle(&mut self, message: Self::Input) -> Self::Output {
        // The demon is slow, takes 2 seconds to reply
        tokio::time::sleep(Duration::from_secs(2)).await;
        println!("{}", message);
    }
}

#[tokio::main]
async fn main() {
    let hell = Hell::new();
    let join_handle = {
        // We ignite our hell instance in another span to guarantee our gate is dropped after use
        let (gate, join_handle) = hell.ignite().await.unwrap();
        // We spawn the echo demon
        let location = gate.spawn(EchoDemon{}).await.unwrap();
        // We wait maximum 1 second for the demon to finish whatever it is doing
        gate.vanquish_and_ignore_with_timeout(&location, Some(Duration::from_secs(1))).await.unwrap();
        join_handle
    };
    // We await the system
    join_handle.await.unwrap();
}
source

pub async fn extinguish(self) -> Result<(), Error>

Stops the broker

By default, the timeout will be used (if set) to put a maximum wait time for all remaining demons to finalize. You can override the behaviour for this function by using the extinguish_with_timeout

use apocalypse::{Hell, Demon};

struct EchoDemon{}

impl Demon for EchoDemon {
    type Input = &'static str;
    type Output = ();
    async fn handle(&mut self, message: Self::Input) -> Self::Output {
        println!("{}", message);
    }
}

#[tokio::main]
async fn main() {
    let hell = Hell::new();
    let join_handle = {
        // We ignite our hell instance in another span to guarantee our gate is dropped after use
        let (gate, join_handle) = hell.ignite().await.unwrap();
        gate.extinguish().await.unwrap();
        join_handle
    };
    // We await the system
    join_handle.await.unwrap();
}
source

pub async fn extinguish_with_timeout( self, timeout: Option<Duration> ) -> Result<(), Error>

Stops the broker

Same as extinguish, but with an override to the timeout parameter

use apocalypse::{Hell, Demon};

struct EchoDemon{}

impl Demon for EchoDemon {
    type Input = &'static str;
    type Output = ();
    async fn handle(&mut self, message: Self::Input) -> Self::Output {
        println!("{}", message);
    }
}

#[tokio::main]
async fn main() {
    let hell = Hell::new();
    let join_handle = {
        // We ignite our hell instance in another span to guarantee our gate is dropped after use
        let (gate, join_handle) = hell.ignite().await.unwrap();
        gate.extinguish_with_timeout(Some(std::time::Duration::from_secs(2))).await.unwrap();
        join_handle
    };
    // We await the system
    join_handle.await.unwrap();
}
source

pub async fn stats(&self) -> Result<HellStats, Error>

Requests hell statistics

This method returns a structure containing operation stats.

use apocalypse::{Hell, Demon};

struct EchoDemon{}

impl Demon for EchoDemon {
    type Input = &'static str;
    type Output = ();
    async fn handle(&mut self, message: Self::Input) -> Self::Output {
        println!("{}", message);
    }
}

#[tokio::main]
async fn main() {
    let hell = Hell::new();
    let join_handle = {
        // We ignite our hell instance in another span to guarantee our gate is dropped after use
        let (gate, join_handle) = hell.ignite().await.unwrap();
        println!("{:?}", gate.stats().await.unwrap());
        join_handle
    };
    // We await the system
    join_handle.await.unwrap();
}

Trait Implementations§

source§

impl Clone for Gate

source§

fn clone(&self) -> Self

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more

Auto Trait Implementations§

§

impl !RefUnwindSafe for Gate

§

impl Send for Gate

§

impl Sync for Gate

§

impl Unpin for Gate

§

impl !UnwindSafe for Gate

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> ToOwned for T
where T: Clone,

§

type Owned = T

The resulting type after obtaining ownership.
source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.