Struct Runtime

Source
pub struct Runtime { /* private fields */ }

Implementations§

Source§

impl Runtime

Source

pub fn init<F: Future>(future: F) -> F::Output

Examples found in repository?
examples/broadcast.rs (line 14)
13pub(crate) fn main() -> Result<()> {
14    Runtime::init(try_main())
15}
More examples
Hide additional examples
examples/echo.rs (line 11)
10pub(crate) fn main() -> Result<()> {
11    Runtime::init(try_main())
12}
examples/echo_failure.rs (line 13)
12pub(crate) fn main() -> Result<()> {
13    Runtime::init(try_main())
14}
examples/g_set.rs (line 15)
14pub(crate) fn main() -> Result<()> {
15    Runtime::init(try_main())
16}
examples/lin_kv.rs (line 14)
13pub(crate) fn main() -> Result<()> {
14    Runtime::init(try_main())
15}
Source§

impl Runtime

Source

pub fn new() -> Self

Examples found in repository?
examples/broadcast.rs (line 19)
17async fn try_main() -> Result<()> {
18    let handler = Arc::new(Handler::default());
19    Runtime::new().with_handler(handler).run().await
20}
More examples
Hide additional examples
examples/echo.rs (line 16)
14async fn try_main() -> Result<()> {
15    let handler = Arc::new(Handler::default());
16    Runtime::new().with_handler(handler).run().await
17}
examples/echo_failure.rs (line 18)
16async fn try_main() -> Result<()> {
17    let handler = Arc::new(Handler::default());
18    Runtime::new().with_handler(handler).run().await
19}
examples/g_set.rs (line 19)
18async fn try_main() -> Result<()> {
19    let runtime = Runtime::new();
20    let handler = Arc::new(Handler::default());
21    runtime.with_handler(handler).run().await
22}
examples/lin_kv.rs (line 18)
17async fn try_main() -> Result<()> {
18    let runtime = Runtime::new();
19    let handler = Arc::new(handler(runtime.clone()));
20    runtime.with_handler(handler).run().await
21}
Source

pub fn with_handler(self, handler: Arc<dyn Node + Send + Sync>) -> Self

Examples found in repository?
examples/broadcast.rs (line 19)
17async fn try_main() -> Result<()> {
18    let handler = Arc::new(Handler::default());
19    Runtime::new().with_handler(handler).run().await
20}
More examples
Hide additional examples
examples/echo.rs (line 16)
14async fn try_main() -> Result<()> {
15    let handler = Arc::new(Handler::default());
16    Runtime::new().with_handler(handler).run().await
17}
examples/echo_failure.rs (line 18)
16async fn try_main() -> Result<()> {
17    let handler = Arc::new(Handler::default());
18    Runtime::new().with_handler(handler).run().await
19}
examples/g_set.rs (line 21)
18async fn try_main() -> Result<()> {
19    let runtime = Runtime::new();
20    let handler = Arc::new(Handler::default());
21    runtime.with_handler(handler).run().await
22}
examples/lin_kv.rs (line 20)
17async fn try_main() -> Result<()> {
18    let runtime = Runtime::new();
19    let handler = Arc::new(handler(runtime.clone()));
20    runtime.with_handler(handler).run().await
21}
Source

pub async fn send_raw(&self, msg: &str) -> Result<()>

Source

pub fn send_async<T>(&self, to: impl Into<String>, message: T) -> Result<()>
where T: Serialize + Send,

Examples found in repository?
examples/g_set.rs (line 64)
31    async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
32        let msg: Result<Request> = req.body.as_obj();
33        match msg {
34            Ok(Request::Read {}) => {
35                let data = to_seq(&self.s.lock().unwrap());
36                return runtime.reply(req, Request::ReadOk { value: data }).await;
37            }
38            Ok(Request::Add { element }) => {
39                self.s.lock().unwrap().insert(element);
40                return runtime.reply(req, Request::AddOk {}).await;
41            }
42            Ok(Request::ReplicateOne { element }) => {
43                self.s.lock().unwrap().insert(element);
44                return Ok(());
45            }
46            Ok(Request::ReplicateFull { value }) => {
47                let mut s = self.s.lock().unwrap();
48                for v in value {
49                    s.insert(v);
50                }
51                return Ok(());
52            }
53            Ok(Request::Init {}) => {
54                // spawn into tokio (instead of runtime) to not to wait
55                // until it is completed, as it will never be.
56                let (r0, h0) = (runtime.clone(), self.clone());
57                tokio::spawn(async move {
58                    loop {
59                        tokio::time::sleep(Duration::from_secs(5)).await;
60                        debug!("emit replication signal");
61                        let s = h0.s.lock().unwrap();
62                        for n in r0.neighbours() {
63                            let msg = Request::ReplicateFull { value: to_seq(&s) };
64                            drop(r0.send_async(n, msg));
65                        }
66                    }
67                });
68                return Ok(());
69            }
70            _ => done(runtime, req),
71        }
72    }
Source

pub async fn send<T>(&self, to: impl Into<String>, message: T) -> Result<()>
where T: Serialize,

Source

pub async fn send_back<T>(&self, req: Message, resp: T) -> Result<()>
where T: Serialize,

Source

pub async fn reply<T>(&self, req: Message, resp: T) -> Result<()>
where T: Serialize,

Examples found in repository?
examples/echo.rs (line 27)
24    async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
25        if req.get_type() == "echo" {
26            let echo = req.body.clone().with_type("echo_ok");
27            return runtime.reply(req, echo).await;
28        }
29
30        done(runtime, req)
31    }
More examples
Hide additional examples
examples/echo_failure.rs (line 33)
28    async fn process(&self, runtime: Runtime, message: Message) -> Result<()> {
29        if message.get_type() == "echo" {
30            if self.inter.fetch_add(1, Ordering::SeqCst) > 0 {
31                let err = maelstrom::Error::TemporarilyUnavailable {};
32                let body = ErrorMessageBody::from_error(err);
33                return runtime.reply(message, body).await;
34            }
35
36            let echo = format!("Another echo {}", message.body.msg_id);
37            let msg = Value::Object(Map::from_iter([("echo".to_string(), Value::String(echo))]));
38            return runtime.reply(message, msg).await;
39        }
40
41        done(runtime, message)
42    }
examples/lin_kv.rs (line 36)
30    async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
31        let (ctx, _handler) = Context::new();
32        let msg: Result<Request> = req.body.as_obj();
33        match msg {
34            Ok(Request::Read { key }) => {
35                let value = self.s.get(ctx, key.to_string()).await?;
36                return runtime.reply(req, Request::ReadOk { value }).await;
37            }
38            Ok(Request::Write { key, value }) => {
39                self.s.put(ctx, key.to_string(), value).await?;
40                return runtime.reply(req, Request::WriteOk {}).await;
41            }
42            Ok(Request::Cas { key, from, to, put }) => {
43                self.s.cas(ctx, key.to_string(), from, to, put).await?;
44                return runtime.reply(req, Request::CasOk {}).await;
45            }
46            _ => done(runtime, req),
47        }
48    }
examples/broadcast.rs (line 41)
35    async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
36        let msg: Result<Request> = req.body.as_obj();
37        match msg {
38            Ok(Request::Read {}) => {
39                let data = self.snapshot();
40                let msg = Request::ReadOk { messages: data };
41                return runtime.reply(req, msg).await;
42            }
43            Ok(Request::Broadcast { message: element }) => {
44                if self.try_add(element) {
45                    info!("messages now {}", element);
46                    for node in runtime.neighbours() {
47                        runtime.call_async(node, Request::Broadcast { message: element });
48                    }
49                }
50
51                return runtime.reply_ok(req).await;
52            }
53            Ok(Request::Topology { topology }) => {
54                let neighbours = topology.get(runtime.node_id()).unwrap();
55                self.inner.lock().unwrap().t = neighbours.clone();
56                info!("My neighbors are {:?}", neighbours);
57                return runtime.reply_ok(req).await;
58            }
59            _ => done(runtime, req),
60        }
61    }
examples/g_set.rs (line 36)
31    async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
32        let msg: Result<Request> = req.body.as_obj();
33        match msg {
34            Ok(Request::Read {}) => {
35                let data = to_seq(&self.s.lock().unwrap());
36                return runtime.reply(req, Request::ReadOk { value: data }).await;
37            }
38            Ok(Request::Add { element }) => {
39                self.s.lock().unwrap().insert(element);
40                return runtime.reply(req, Request::AddOk {}).await;
41            }
42            Ok(Request::ReplicateOne { element }) => {
43                self.s.lock().unwrap().insert(element);
44                return Ok(());
45            }
46            Ok(Request::ReplicateFull { value }) => {
47                let mut s = self.s.lock().unwrap();
48                for v in value {
49                    s.insert(v);
50                }
51                return Ok(());
52            }
53            Ok(Request::Init {}) => {
54                // spawn into tokio (instead of runtime) to not to wait
55                // until it is completed, as it will never be.
56                let (r0, h0) = (runtime.clone(), self.clone());
57                tokio::spawn(async move {
58                    loop {
59                        tokio::time::sleep(Duration::from_secs(5)).await;
60                        debug!("emit replication signal");
61                        let s = h0.s.lock().unwrap();
62                        for n in r0.neighbours() {
63                            let msg = Request::ReplicateFull { value: to_seq(&s) };
64                            drop(r0.send_async(n, msg));
65                        }
66                    }
67                });
68                return Ok(());
69            }
70            _ => done(runtime, req),
71        }
72    }
Source

pub async fn reply_ok(&self, req: Message) -> Result<()>

Examples found in repository?
examples/broadcast.rs (line 51)
35    async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
36        let msg: Result<Request> = req.body.as_obj();
37        match msg {
38            Ok(Request::Read {}) => {
39                let data = self.snapshot();
40                let msg = Request::ReadOk { messages: data };
41                return runtime.reply(req, msg).await;
42            }
43            Ok(Request::Broadcast { message: element }) => {
44                if self.try_add(element) {
45                    info!("messages now {}", element);
46                    for node in runtime.neighbours() {
47                        runtime.call_async(node, Request::Broadcast { message: element });
48                    }
49                }
50
51                return runtime.reply_ok(req).await;
52            }
53            Ok(Request::Topology { topology }) => {
54                let neighbours = topology.get(runtime.node_id()).unwrap();
55                self.inner.lock().unwrap().t = neighbours.clone();
56                info!("My neighbors are {:?}", neighbours);
57                return runtime.reply_ok(req).await;
58            }
59            _ => done(runtime, req),
60        }
61    }
Source

pub fn spawn<T>(&self, future: T) -> JoinHandle<T::Output>
where T: Future + Send + 'static, T::Output: Send + 'static,

Source

pub fn rpc<T>( &self, to: impl Into<String>, request: T, ) -> impl Future<Output = Result<RPCResult>>
where T: Serialize,

rpc() makes a remote call to another node via message passing interface. Provided context may serve as a timeout limiter. RPCResult is immediately canceled on drop.

Example:

use maelstrom::{Error, Result, Runtime};
use std::fmt::{Display, Formatter};
use serde::Serialize;
use serde::Deserialize;
use tokio_context::context::Context;

pub struct Storage {
    typ: &'static str,
    runtime: Runtime,
}

impl Storage {
    async fn get<T>(&self, ctx: Context, key: String) -> Result<T>
        where
            T: Deserialize<'static> + Send,
    {
        let req = Message::Read::<String> { key };
        let mut call = self.runtime.rpc(self.typ, req).await?;
        let msg = call.done_with(ctx).await?;
        let data = msg.body.as_obj::<Message<T>>()?;
        match data {
            Message::ReadOk { value } => Ok(value),
            _ => Err(Box::new(Error::Custom(
                -1,
                "kv: protocol violated".to_string(),
            ))),
        }
    }
}

#[derive(Serialize, Deserialize)]
#[serde(rename_all = "snake_case", tag = "type")]
enum Message<T> {
    Read {
        key: String,
    },
    ReadOk {
        value: T,
    },
}
Source

pub async fn call<T>( &self, ctx: Context, to: impl Into<String>, request: T, ) -> Result<Message>
where T: Serialize,

call() is the same as let _: Result<Message> = rpc().await?.done_with(ctx).await;. for examples see Runtime::rpc and RPCResult.

rpc() makes a remote call to another node via message passing interface. Provided context may serve as a timeout limiter. RPCResult is immediately canceled on drop.

Source

pub fn call_async<T>(&self, to: impl Into<String>, request: T)
where T: Serialize + 'static,

call_async() is equivalent to runtime.spawn(runtime.call(...)). see Runtime::call, Runtime::rpc.

Examples found in repository?
examples/broadcast.rs (line 47)
35    async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
36        let msg: Result<Request> = req.body.as_obj();
37        match msg {
38            Ok(Request::Read {}) => {
39                let data = self.snapshot();
40                let msg = Request::ReadOk { messages: data };
41                return runtime.reply(req, msg).await;
42            }
43            Ok(Request::Broadcast { message: element }) => {
44                if self.try_add(element) {
45                    info!("messages now {}", element);
46                    for node in runtime.neighbours() {
47                        runtime.call_async(node, Request::Broadcast { message: element });
48                    }
49                }
50
51                return runtime.reply_ok(req).await;
52            }
53            Ok(Request::Topology { topology }) => {
54                let neighbours = topology.get(runtime.node_id()).unwrap();
55                self.inner.lock().unwrap().t = neighbours.clone();
56                info!("My neighbors are {:?}", neighbours);
57                return runtime.reply_ok(req).await;
58            }
59            _ => done(runtime, req),
60        }
61    }
Source

pub fn node_id(&self) -> &str

Examples found in repository?
examples/broadcast.rs (line 54)
35    async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
36        let msg: Result<Request> = req.body.as_obj();
37        match msg {
38            Ok(Request::Read {}) => {
39                let data = self.snapshot();
40                let msg = Request::ReadOk { messages: data };
41                return runtime.reply(req, msg).await;
42            }
43            Ok(Request::Broadcast { message: element }) => {
44                if self.try_add(element) {
45                    info!("messages now {}", element);
46                    for node in runtime.neighbours() {
47                        runtime.call_async(node, Request::Broadcast { message: element });
48                    }
49                }
50
51                return runtime.reply_ok(req).await;
52            }
53            Ok(Request::Topology { topology }) => {
54                let neighbours = topology.get(runtime.node_id()).unwrap();
55                self.inner.lock().unwrap().t = neighbours.clone();
56                info!("My neighbors are {:?}", neighbours);
57                return runtime.reply_ok(req).await;
58            }
59            _ => done(runtime, req),
60        }
61    }
Source

pub fn nodes(&self) -> &[String]

Source

pub fn set_membership_state(&self, state: MembershipState) -> Result<()>

Source

pub async fn done(&self)

Source

pub async fn run(&self) -> Result<()>

Examples found in repository?
examples/broadcast.rs (line 19)
17async fn try_main() -> Result<()> {
18    let handler = Arc::new(Handler::default());
19    Runtime::new().with_handler(handler).run().await
20}
More examples
Hide additional examples
examples/echo.rs (line 16)
14async fn try_main() -> Result<()> {
15    let handler = Arc::new(Handler::default());
16    Runtime::new().with_handler(handler).run().await
17}
examples/echo_failure.rs (line 18)
16async fn try_main() -> Result<()> {
17    let handler = Arc::new(Handler::default());
18    Runtime::new().with_handler(handler).run().await
19}
examples/g_set.rs (line 21)
18async fn try_main() -> Result<()> {
19    let runtime = Runtime::new();
20    let handler = Arc::new(Handler::default());
21    runtime.with_handler(handler).run().await
22}
examples/lin_kv.rs (line 20)
17async fn try_main() -> Result<()> {
18    let runtime = Runtime::new();
19    let handler = Arc::new(handler(runtime.clone()));
20    runtime.with_handler(handler).run().await
21}
Source

pub async fn run_with<R>(&self, input: BufReader<R>) -> Result<()>
where R: AsyncRead + Unpin,

Source

pub fn next_msg_id(&self) -> u64

Source

pub fn empty_response() -> Value

Source

pub fn is_client(&self, src: &String) -> bool

Source

pub fn is_from_cluster(&self, src: &String) -> bool

Source

pub fn neighbours(&self) -> impl Iterator<Item = &String>

All nodes that are not this node.

Examples found in repository?
examples/broadcast.rs (line 46)
35    async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
36        let msg: Result<Request> = req.body.as_obj();
37        match msg {
38            Ok(Request::Read {}) => {
39                let data = self.snapshot();
40                let msg = Request::ReadOk { messages: data };
41                return runtime.reply(req, msg).await;
42            }
43            Ok(Request::Broadcast { message: element }) => {
44                if self.try_add(element) {
45                    info!("messages now {}", element);
46                    for node in runtime.neighbours() {
47                        runtime.call_async(node, Request::Broadcast { message: element });
48                    }
49                }
50
51                return runtime.reply_ok(req).await;
52            }
53            Ok(Request::Topology { topology }) => {
54                let neighbours = topology.get(runtime.node_id()).unwrap();
55                self.inner.lock().unwrap().t = neighbours.clone();
56                info!("My neighbors are {:?}", neighbours);
57                return runtime.reply_ok(req).await;
58            }
59            _ => done(runtime, req),
60        }
61    }
More examples
Hide additional examples
examples/g_set.rs (line 62)
31    async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
32        let msg: Result<Request> = req.body.as_obj();
33        match msg {
34            Ok(Request::Read {}) => {
35                let data = to_seq(&self.s.lock().unwrap());
36                return runtime.reply(req, Request::ReadOk { value: data }).await;
37            }
38            Ok(Request::Add { element }) => {
39                self.s.lock().unwrap().insert(element);
40                return runtime.reply(req, Request::AddOk {}).await;
41            }
42            Ok(Request::ReplicateOne { element }) => {
43                self.s.lock().unwrap().insert(element);
44                return Ok(());
45            }
46            Ok(Request::ReplicateFull { value }) => {
47                let mut s = self.s.lock().unwrap();
48                for v in value {
49                    s.insert(v);
50                }
51                return Ok(());
52            }
53            Ok(Request::Init {}) => {
54                // spawn into tokio (instead of runtime) to not to wait
55                // until it is completed, as it will never be.
56                let (r0, h0) = (runtime.clone(), self.clone());
57                tokio::spawn(async move {
58                    loop {
59                        tokio::time::sleep(Duration::from_secs(5)).await;
60                        debug!("emit replication signal");
61                        let s = h0.s.lock().unwrap();
62                        for n in r0.neighbours() {
63                            let msg = Request::ReplicateFull { value: to_seq(&s) };
64                            drop(r0.send_async(n, msg));
65                        }
66                    }
67                });
68                return Ok(());
69            }
70            _ => done(runtime, req),
71        }
72    }

Trait Implementations§

Source§

impl Clone for Runtime

Source§

fn clone(&self) -> Self

Returns a duplicate of the value. Read more
1.0.0 · Source§

const fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Default for Runtime

Source§

fn default() -> Self

Returns the “default value” for a type. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.