Struct maelstrom::Runtime

source ·
pub struct Runtime { /* private fields */ }

Implementations§

source§

impl Runtime

source

pub fn init<F: Future>(future: F) -> F::Output

Examples found in repository?
examples/broadcast.rs (line 14)
13
14
15
pub(crate) fn main() -> Result<()> {
    Runtime::init(try_main())
}
More examples
Hide additional examples
examples/echo.rs (line 11)
10
11
12
pub(crate) fn main() -> Result<()> {
    Runtime::init(try_main())
}
examples/echo_failure.rs (line 13)
12
13
14
pub(crate) fn main() -> Result<()> {
    Runtime::init(try_main())
}
examples/g_set.rs (line 15)
14
15
16
pub(crate) fn main() -> Result<()> {
    Runtime::init(try_main())
}
examples/lin_kv.rs (line 14)
13
14
15
pub(crate) fn main() -> Result<()> {
    Runtime::init(try_main())
}
source§

impl Runtime

source

pub fn new() -> Self

Examples found in repository?
examples/broadcast.rs (line 19)
17
18
19
20
async fn try_main() -> Result<()> {
    let handler = Arc::new(Handler::default());
    Runtime::new().with_handler(handler).run().await
}
More examples
Hide additional examples
examples/echo.rs (line 16)
14
15
16
17
async fn try_main() -> Result<()> {
    let handler = Arc::new(Handler::default());
    Runtime::new().with_handler(handler).run().await
}
examples/echo_failure.rs (line 18)
16
17
18
19
async fn try_main() -> Result<()> {
    let handler = Arc::new(Handler::default());
    Runtime::new().with_handler(handler).run().await
}
examples/g_set.rs (line 19)
18
19
20
21
22
async fn try_main() -> Result<()> {
    let runtime = Runtime::new();
    let handler = Arc::new(Handler::default());
    runtime.with_handler(handler).run().await
}
examples/lin_kv.rs (line 18)
17
18
19
20
21
async fn try_main() -> Result<()> {
    let runtime = Runtime::new();
    let handler = Arc::new(handler(runtime.clone()));
    runtime.with_handler(handler).run().await
}
source

pub fn with_handler(self, handler: Arc<dyn Node + Send + Sync>) -> Self

Examples found in repository?
examples/broadcast.rs (line 19)
17
18
19
20
async fn try_main() -> Result<()> {
    let handler = Arc::new(Handler::default());
    Runtime::new().with_handler(handler).run().await
}
More examples
Hide additional examples
examples/echo.rs (line 16)
14
15
16
17
async fn try_main() -> Result<()> {
    let handler = Arc::new(Handler::default());
    Runtime::new().with_handler(handler).run().await
}
examples/echo_failure.rs (line 18)
16
17
18
19
async fn try_main() -> Result<()> {
    let handler = Arc::new(Handler::default());
    Runtime::new().with_handler(handler).run().await
}
examples/g_set.rs (line 21)
18
19
20
21
22
async fn try_main() -> Result<()> {
    let runtime = Runtime::new();
    let handler = Arc::new(Handler::default());
    runtime.with_handler(handler).run().await
}
examples/lin_kv.rs (line 20)
17
18
19
20
21
async fn try_main() -> Result<()> {
    let runtime = Runtime::new();
    let handler = Arc::new(handler(runtime.clone()));
    runtime.with_handler(handler).run().await
}
source

pub async fn send_raw(&self, msg: &str) -> Result<()>

source

pub fn send_async<T>(&self, to: impl Into<String>, message: T) -> Result<()>where T: Serialize + Send,

Examples found in repository?
examples/g_set.rs (line 64)
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
    async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
        let msg: Result<Request> = req.body.as_obj();
        match msg {
            Ok(Request::Read {}) => {
                let data = to_seq(&self.s.lock().unwrap());
                return runtime.reply(req, Request::ReadOk { value: data }).await;
            }
            Ok(Request::Add { element }) => {
                self.s.lock().unwrap().insert(element);
                return runtime.reply(req, Request::AddOk {}).await;
            }
            Ok(Request::ReplicateOne { element }) => {
                self.s.lock().unwrap().insert(element);
                return Ok(());
            }
            Ok(Request::ReplicateFull { value }) => {
                let mut s = self.s.lock().unwrap();
                for v in value {
                    s.insert(v);
                }
                return Ok(());
            }
            Ok(Request::Init {}) => {
                // spawn into tokio (instead of runtime) to not to wait
                // until it is completed, as it will never be.
                let (r0, h0) = (runtime.clone(), self.clone());
                tokio::spawn(async move {
                    loop {
                        tokio::time::sleep(Duration::from_secs(5)).await;
                        debug!("emit replication signal");
                        let s = h0.s.lock().unwrap();
                        for n in r0.neighbours() {
                            let msg = Request::ReplicateFull { value: to_seq(&s) };
                            drop(r0.send_async(n, msg));
                        }
                    }
                });
                return Ok(());
            }
            _ => done(runtime, req),
        }
    }
source

pub async fn send<T>(&self, to: impl Into<String>, message: T) -> Result<()>where T: Serialize,

source

pub async fn send_back<T>(&self, req: Message, resp: T) -> Result<()>where T: Serialize,

source

pub async fn reply<T>(&self, req: Message, resp: T) -> Result<()>where T: Serialize,

Examples found in repository?
examples/echo.rs (line 27)
24
25
26
27
28
29
30
31
    async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
        if req.get_type() == "echo" {
            let echo = req.body.clone().with_type("echo_ok");
            return runtime.reply(req, echo).await;
        }

        done(runtime, req)
    }
More examples
Hide additional examples
examples/echo_failure.rs (line 33)
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
    async fn process(&self, runtime: Runtime, message: Message) -> Result<()> {
        if message.get_type() == "echo" {
            if self.inter.fetch_add(1, Ordering::SeqCst) > 0 {
                let err = maelstrom::Error::TemporarilyUnavailable {};
                let body = ErrorMessageBody::from_error(err);
                return runtime.reply(message, body).await;
            }

            let echo = format!("Another echo {}", message.body.msg_id);
            let msg = Value::Object(Map::from_iter([("echo".to_string(), Value::String(echo))]));
            return runtime.reply(message, msg).await;
        }

        done(runtime, message)
    }
examples/lin_kv.rs (line 36)
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
    async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
        let (ctx, _handler) = Context::new();
        let msg: Result<Request> = req.body.as_obj();
        match msg {
            Ok(Request::Read { key }) => {
                let value = self.s.get(ctx, key.to_string()).await?;
                return runtime.reply(req, Request::ReadOk { value }).await;
            }
            Ok(Request::Write { key, value }) => {
                self.s.put(ctx, key.to_string(), value).await?;
                return runtime.reply(req, Request::WriteOk {}).await;
            }
            Ok(Request::Cas { key, from, to, put }) => {
                self.s.cas(ctx, key.to_string(), from, to, put).await?;
                return runtime.reply(req, Request::CasOk {}).await;
            }
            _ => done(runtime, req),
        }
    }
examples/broadcast.rs (line 41)
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
    async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
        let msg: Result<Request> = req.body.as_obj();
        match msg {
            Ok(Request::Read {}) => {
                let data = self.snapshot();
                let msg = Request::ReadOk { messages: data };
                return runtime.reply(req, msg).await;
            }
            Ok(Request::Broadcast { message: element }) => {
                if self.try_add(element) {
                    info!("messages now {}", element);
                    for node in runtime.neighbours() {
                        runtime.call_async(node, Request::Broadcast { message: element });
                    }
                }

                return runtime.reply_ok(req).await;
            }
            Ok(Request::Topology { topology }) => {
                let neighbours = topology.get(runtime.node_id()).unwrap();
                self.inner.lock().unwrap().t = neighbours.clone();
                info!("My neighbors are {:?}", neighbours);
                return runtime.reply_ok(req).await;
            }
            _ => done(runtime, req),
        }
    }
examples/g_set.rs (line 36)
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
    async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
        let msg: Result<Request> = req.body.as_obj();
        match msg {
            Ok(Request::Read {}) => {
                let data = to_seq(&self.s.lock().unwrap());
                return runtime.reply(req, Request::ReadOk { value: data }).await;
            }
            Ok(Request::Add { element }) => {
                self.s.lock().unwrap().insert(element);
                return runtime.reply(req, Request::AddOk {}).await;
            }
            Ok(Request::ReplicateOne { element }) => {
                self.s.lock().unwrap().insert(element);
                return Ok(());
            }
            Ok(Request::ReplicateFull { value }) => {
                let mut s = self.s.lock().unwrap();
                for v in value {
                    s.insert(v);
                }
                return Ok(());
            }
            Ok(Request::Init {}) => {
                // spawn into tokio (instead of runtime) to not to wait
                // until it is completed, as it will never be.
                let (r0, h0) = (runtime.clone(), self.clone());
                tokio::spawn(async move {
                    loop {
                        tokio::time::sleep(Duration::from_secs(5)).await;
                        debug!("emit replication signal");
                        let s = h0.s.lock().unwrap();
                        for n in r0.neighbours() {
                            let msg = Request::ReplicateFull { value: to_seq(&s) };
                            drop(r0.send_async(n, msg));
                        }
                    }
                });
                return Ok(());
            }
            _ => done(runtime, req),
        }
    }
source

pub async fn reply_ok(&self, req: Message) -> Result<()>

Examples found in repository?
examples/broadcast.rs (line 51)
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
    async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
        let msg: Result<Request> = req.body.as_obj();
        match msg {
            Ok(Request::Read {}) => {
                let data = self.snapshot();
                let msg = Request::ReadOk { messages: data };
                return runtime.reply(req, msg).await;
            }
            Ok(Request::Broadcast { message: element }) => {
                if self.try_add(element) {
                    info!("messages now {}", element);
                    for node in runtime.neighbours() {
                        runtime.call_async(node, Request::Broadcast { message: element });
                    }
                }

                return runtime.reply_ok(req).await;
            }
            Ok(Request::Topology { topology }) => {
                let neighbours = topology.get(runtime.node_id()).unwrap();
                self.inner.lock().unwrap().t = neighbours.clone();
                info!("My neighbors are {:?}", neighbours);
                return runtime.reply_ok(req).await;
            }
            _ => done(runtime, req),
        }
    }
source

pub fn spawn<T>(&self, future: T) -> JoinHandle<T::Output>where T: Future + Send + 'static, T::Output: Send + 'static,

source

pub fn rpc<T>( &self, to: impl Into<String>, request: T ) -> impl Future<Output = Result<RPCResult>>where T: Serialize,

rpc() makes a remote call to another node via message passing interface. Provided context may serve as a timeout limiter. RPCResult is immediately canceled on drop.

Example:

use maelstrom::{Error, Result, Runtime};
use std::fmt::{Display, Formatter};
use serde::Serialize;
use serde::Deserialize;
use tokio_context::context::Context;

pub struct Storage {
    typ: &'static str,
    runtime: Runtime,
}

impl Storage {
    async fn get<T>(&self, ctx: Context, key: String) -> Result<T>
        where
            T: Deserialize<'static> + Send,
    {
        let req = Message::Read::<String> { key };
        let mut call = self.runtime.rpc(self.typ, req).await?;
        let msg = call.done_with(ctx).await?;
        let data = msg.body.as_obj::<Message<T>>()?;
        match data {
            Message::ReadOk { value } => Ok(value),
            _ => Err(Box::new(Error::Custom(
                -1,
                "kv: protocol violated".to_string(),
            ))),
        }
    }
}

#[derive(Serialize, Deserialize)]
#[serde(rename_all = "snake_case", tag = "type")]
enum Message<T> {
    Read {
        key: String,
    },
    ReadOk {
        value: T,
    },
}
source

pub async fn call<T>( &self, ctx: Context, to: impl Into<String>, request: T ) -> Result<Message>where T: Serialize,

call() is the same as let _: Result<Message> = rpc().await?.done_with(ctx).await;. for examples see Runtime::rpc and RPCResult.

rpc() makes a remote call to another node via message passing interface. Provided context may serve as a timeout limiter. RPCResult is immediately canceled on drop.

source

pub fn call_async<T>(&self, to: impl Into<String>, request: T)where T: Serialize + 'static,

call_async() is equivalent to runtime.spawn(runtime.call(...)). see Runtime::call, Runtime::rpc.

Examples found in repository?
examples/broadcast.rs (line 47)
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
    async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
        let msg: Result<Request> = req.body.as_obj();
        match msg {
            Ok(Request::Read {}) => {
                let data = self.snapshot();
                let msg = Request::ReadOk { messages: data };
                return runtime.reply(req, msg).await;
            }
            Ok(Request::Broadcast { message: element }) => {
                if self.try_add(element) {
                    info!("messages now {}", element);
                    for node in runtime.neighbours() {
                        runtime.call_async(node, Request::Broadcast { message: element });
                    }
                }

                return runtime.reply_ok(req).await;
            }
            Ok(Request::Topology { topology }) => {
                let neighbours = topology.get(runtime.node_id()).unwrap();
                self.inner.lock().unwrap().t = neighbours.clone();
                info!("My neighbors are {:?}", neighbours);
                return runtime.reply_ok(req).await;
            }
            _ => done(runtime, req),
        }
    }
source

pub fn node_id(&self) -> &str

Examples found in repository?
examples/broadcast.rs (line 54)
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
    async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
        let msg: Result<Request> = req.body.as_obj();
        match msg {
            Ok(Request::Read {}) => {
                let data = self.snapshot();
                let msg = Request::ReadOk { messages: data };
                return runtime.reply(req, msg).await;
            }
            Ok(Request::Broadcast { message: element }) => {
                if self.try_add(element) {
                    info!("messages now {}", element);
                    for node in runtime.neighbours() {
                        runtime.call_async(node, Request::Broadcast { message: element });
                    }
                }

                return runtime.reply_ok(req).await;
            }
            Ok(Request::Topology { topology }) => {
                let neighbours = topology.get(runtime.node_id()).unwrap();
                self.inner.lock().unwrap().t = neighbours.clone();
                info!("My neighbors are {:?}", neighbours);
                return runtime.reply_ok(req).await;
            }
            _ => done(runtime, req),
        }
    }
source

pub fn nodes(&self) -> &[String]

source

pub fn set_membership_state(&self, state: MembershipState) -> Result<()>

source

pub async fn done(&self)

source

pub async fn run(&self) -> Result<()>

Examples found in repository?
examples/broadcast.rs (line 19)
17
18
19
20
async fn try_main() -> Result<()> {
    let handler = Arc::new(Handler::default());
    Runtime::new().with_handler(handler).run().await
}
More examples
Hide additional examples
examples/echo.rs (line 16)
14
15
16
17
async fn try_main() -> Result<()> {
    let handler = Arc::new(Handler::default());
    Runtime::new().with_handler(handler).run().await
}
examples/echo_failure.rs (line 18)
16
17
18
19
async fn try_main() -> Result<()> {
    let handler = Arc::new(Handler::default());
    Runtime::new().with_handler(handler).run().await
}
examples/g_set.rs (line 21)
18
19
20
21
22
async fn try_main() -> Result<()> {
    let runtime = Runtime::new();
    let handler = Arc::new(Handler::default());
    runtime.with_handler(handler).run().await
}
examples/lin_kv.rs (line 20)
17
18
19
20
21
async fn try_main() -> Result<()> {
    let runtime = Runtime::new();
    let handler = Arc::new(handler(runtime.clone()));
    runtime.with_handler(handler).run().await
}
source

pub async fn run_with<R>(&self, input: BufReader<R>) -> Result<()>where R: AsyncRead + Unpin,

source

pub fn next_msg_id(&self) -> u64

source

pub fn empty_response() -> Value

source

pub fn is_client(&self, src: &String) -> bool

source

pub fn is_from_cluster(&self, src: &String) -> bool

source

pub fn neighbours(&self) -> impl Iterator<Item = &String>

All nodes that are not this node.

Examples found in repository?
examples/broadcast.rs (line 46)
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
    async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
        let msg: Result<Request> = req.body.as_obj();
        match msg {
            Ok(Request::Read {}) => {
                let data = self.snapshot();
                let msg = Request::ReadOk { messages: data };
                return runtime.reply(req, msg).await;
            }
            Ok(Request::Broadcast { message: element }) => {
                if self.try_add(element) {
                    info!("messages now {}", element);
                    for node in runtime.neighbours() {
                        runtime.call_async(node, Request::Broadcast { message: element });
                    }
                }

                return runtime.reply_ok(req).await;
            }
            Ok(Request::Topology { topology }) => {
                let neighbours = topology.get(runtime.node_id()).unwrap();
                self.inner.lock().unwrap().t = neighbours.clone();
                info!("My neighbors are {:?}", neighbours);
                return runtime.reply_ok(req).await;
            }
            _ => done(runtime, req),
        }
    }
More examples
Hide additional examples
examples/g_set.rs (line 62)
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
    async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
        let msg: Result<Request> = req.body.as_obj();
        match msg {
            Ok(Request::Read {}) => {
                let data = to_seq(&self.s.lock().unwrap());
                return runtime.reply(req, Request::ReadOk { value: data }).await;
            }
            Ok(Request::Add { element }) => {
                self.s.lock().unwrap().insert(element);
                return runtime.reply(req, Request::AddOk {}).await;
            }
            Ok(Request::ReplicateOne { element }) => {
                self.s.lock().unwrap().insert(element);
                return Ok(());
            }
            Ok(Request::ReplicateFull { value }) => {
                let mut s = self.s.lock().unwrap();
                for v in value {
                    s.insert(v);
                }
                return Ok(());
            }
            Ok(Request::Init {}) => {
                // spawn into tokio (instead of runtime) to not to wait
                // until it is completed, as it will never be.
                let (r0, h0) = (runtime.clone(), self.clone());
                tokio::spawn(async move {
                    loop {
                        tokio::time::sleep(Duration::from_secs(5)).await;
                        debug!("emit replication signal");
                        let s = h0.s.lock().unwrap();
                        for n in r0.neighbours() {
                            let msg = Request::ReplicateFull { value: to_seq(&s) };
                            drop(r0.send_async(n, msg));
                        }
                    }
                });
                return Ok(());
            }
            _ => done(runtime, req),
        }
    }

Trait Implementations§

source§

impl Clone for Runtime

source§

fn clone(&self) -> Self

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
source§

impl Default for Runtime

source§

fn default() -> Self

Returns the “default value” for a type. Read more

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for Twhere T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for Twhere T: ?Sized,

const: unstable · source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for Twhere T: ?Sized,

const: unstable · source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

const: unstable · source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for Twhere U: From<T>,

const: unstable · source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> ToOwned for Twhere T: Clone,

§

type Owned = T

The resulting type after obtaining ownership.
source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
source§

impl<T, U> TryFrom<U> for Twhere U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
const: unstable · source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for Twhere U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
const: unstable · source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.