pub struct Runtime { /* private fields */ }
Implementations§
source§impl Runtime
impl Runtime
sourcepub fn new() -> Self
pub fn new() -> Self
Examples found in repository?
More examples
sourcepub fn with_handler(self, handler: Arc<dyn Node + Send + Sync>) -> Self
pub fn with_handler(self, handler: Arc<dyn Node + Send + Sync>) -> Self
Examples found in repository?
More examples
pub async fn send_raw(&self, msg: &str) -> Result<()>
sourcepub fn send_async<T>(&self, to: impl Into<String>, message: T) -> Result<()>where
T: Serialize + Send,
pub fn send_async<T>(&self, to: impl Into<String>, message: T) -> Result<()>where T: Serialize + Send,
Examples found in repository?
examples/g_set.rs (line 64)
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
let msg: Result<Request> = req.body.as_obj();
match msg {
Ok(Request::Read {}) => {
let data = to_seq(&self.s.lock().unwrap());
return runtime.reply(req, Request::ReadOk { value: data }).await;
}
Ok(Request::Add { element }) => {
self.s.lock().unwrap().insert(element);
return runtime.reply(req, Request::AddOk {}).await;
}
Ok(Request::ReplicateOne { element }) => {
self.s.lock().unwrap().insert(element);
return Ok(());
}
Ok(Request::ReplicateFull { value }) => {
let mut s = self.s.lock().unwrap();
for v in value {
s.insert(v);
}
return Ok(());
}
Ok(Request::Init {}) => {
// spawn into tokio (instead of runtime) to not to wait
// until it is completed, as it will never be.
let (r0, h0) = (runtime.clone(), self.clone());
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(5)).await;
debug!("emit replication signal");
let s = h0.s.lock().unwrap();
for n in r0.neighbours() {
let msg = Request::ReplicateFull { value: to_seq(&s) };
drop(r0.send_async(n, msg));
}
}
});
return Ok(());
}
_ => done(runtime, req),
}
}
pub async fn send<T>(&self, to: impl Into<String>, message: T) -> Result<()>where T: Serialize,
pub async fn send_back<T>(&self, req: Message, resp: T) -> Result<()>where T: Serialize,
sourcepub async fn reply<T>(&self, req: Message, resp: T) -> Result<()>where
T: Serialize,
pub async fn reply<T>(&self, req: Message, resp: T) -> Result<()>where T: Serialize,
Examples found in repository?
More examples
examples/echo_failure.rs (line 33)
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
async fn process(&self, runtime: Runtime, message: Message) -> Result<()> {
if message.get_type() == "echo" {
if self.inter.fetch_add(1, Ordering::SeqCst) > 0 {
let err = maelstrom::Error::TemporarilyUnavailable {};
let body = ErrorMessageBody::from_error(err);
return runtime.reply(message, body).await;
}
let echo = format!("Another echo {}", message.body.msg_id);
let msg = Value::Object(Map::from_iter([("echo".to_string(), Value::String(echo))]));
return runtime.reply(message, msg).await;
}
done(runtime, message)
}
examples/lin_kv.rs (line 36)
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
let (ctx, _handler) = Context::new();
let msg: Result<Request> = req.body.as_obj();
match msg {
Ok(Request::Read { key }) => {
let value = self.s.get(ctx, key.to_string()).await?;
return runtime.reply(req, Request::ReadOk { value }).await;
}
Ok(Request::Write { key, value }) => {
self.s.put(ctx, key.to_string(), value).await?;
return runtime.reply(req, Request::WriteOk {}).await;
}
Ok(Request::Cas { key, from, to, put }) => {
self.s.cas(ctx, key.to_string(), from, to, put).await?;
return runtime.reply(req, Request::CasOk {}).await;
}
_ => done(runtime, req),
}
}
examples/broadcast.rs (line 41)
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
let msg: Result<Request> = req.body.as_obj();
match msg {
Ok(Request::Read {}) => {
let data = self.snapshot();
let msg = Request::ReadOk { messages: data };
return runtime.reply(req, msg).await;
}
Ok(Request::Broadcast { message: element }) => {
if self.try_add(element) {
info!("messages now {}", element);
for node in runtime.neighbours() {
runtime.call_async(node, Request::Broadcast { message: element });
}
}
return runtime.reply_ok(req).await;
}
Ok(Request::Topology { topology }) => {
let neighbours = topology.get(runtime.node_id()).unwrap();
self.inner.lock().unwrap().t = neighbours.clone();
info!("My neighbors are {:?}", neighbours);
return runtime.reply_ok(req).await;
}
_ => done(runtime, req),
}
}
examples/g_set.rs (line 36)
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
let msg: Result<Request> = req.body.as_obj();
match msg {
Ok(Request::Read {}) => {
let data = to_seq(&self.s.lock().unwrap());
return runtime.reply(req, Request::ReadOk { value: data }).await;
}
Ok(Request::Add { element }) => {
self.s.lock().unwrap().insert(element);
return runtime.reply(req, Request::AddOk {}).await;
}
Ok(Request::ReplicateOne { element }) => {
self.s.lock().unwrap().insert(element);
return Ok(());
}
Ok(Request::ReplicateFull { value }) => {
let mut s = self.s.lock().unwrap();
for v in value {
s.insert(v);
}
return Ok(());
}
Ok(Request::Init {}) => {
// spawn into tokio (instead of runtime) to not to wait
// until it is completed, as it will never be.
let (r0, h0) = (runtime.clone(), self.clone());
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(5)).await;
debug!("emit replication signal");
let s = h0.s.lock().unwrap();
for n in r0.neighbours() {
let msg = Request::ReplicateFull { value: to_seq(&s) };
drop(r0.send_async(n, msg));
}
}
});
return Ok(());
}
_ => done(runtime, req),
}
}
sourcepub async fn reply_ok(&self, req: Message) -> Result<()>
pub async fn reply_ok(&self, req: Message) -> Result<()>
Examples found in repository?
examples/broadcast.rs (line 51)
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
let msg: Result<Request> = req.body.as_obj();
match msg {
Ok(Request::Read {}) => {
let data = self.snapshot();
let msg = Request::ReadOk { messages: data };
return runtime.reply(req, msg).await;
}
Ok(Request::Broadcast { message: element }) => {
if self.try_add(element) {
info!("messages now {}", element);
for node in runtime.neighbours() {
runtime.call_async(node, Request::Broadcast { message: element });
}
}
return runtime.reply_ok(req).await;
}
Ok(Request::Topology { topology }) => {
let neighbours = topology.get(runtime.node_id()).unwrap();
self.inner.lock().unwrap().t = neighbours.clone();
info!("My neighbors are {:?}", neighbours);
return runtime.reply_ok(req).await;
}
_ => done(runtime, req),
}
}
pub fn spawn<T>(&self, future: T) -> JoinHandle<T::Output>where T: Future + Send + 'static, T::Output: Send + 'static,
sourcepub fn rpc<T>(
&self,
to: impl Into<String>,
request: T
) -> impl Future<Output = Result<RPCResult>>where
T: Serialize,
pub fn rpc<T>( &self, to: impl Into<String>, request: T ) -> impl Future<Output = Result<RPCResult>>where T: Serialize,
rpc() makes a remote call to another node via message passing interface.
Provided context may serve as a timeout limiter.
RPCResult
is immediately canceled on drop.
Example:
use maelstrom::{Error, Result, Runtime};
use std::fmt::{Display, Formatter};
use serde::Serialize;
use serde::Deserialize;
use tokio_context::context::Context;
pub struct Storage {
typ: &'static str,
runtime: Runtime,
}
impl Storage {
async fn get<T>(&self, ctx: Context, key: String) -> Result<T>
where
T: Deserialize<'static> + Send,
{
let req = Message::Read::<String> { key };
let mut call = self.runtime.rpc(self.typ, req).await?;
let msg = call.done_with(ctx).await?;
let data = msg.body.as_obj::<Message<T>>()?;
match data {
Message::ReadOk { value } => Ok(value),
_ => Err(Box::new(Error::Custom(
-1,
"kv: protocol violated".to_string(),
))),
}
}
}
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "snake_case", tag = "type")]
enum Message<T> {
Read {
key: String,
},
ReadOk {
value: T,
},
}
sourcepub async fn call<T>(
&self,
ctx: Context,
to: impl Into<String>,
request: T
) -> Result<Message>where
T: Serialize,
pub async fn call<T>( &self, ctx: Context, to: impl Into<String>, request: T ) -> Result<Message>where T: Serialize,
call() is the same as let _: Result<Message> = rpc().await?.done_with(ctx).await;
.
for examples see Runtime::rpc
and RPCResult
.
rpc() makes a remote call to another node via message passing interface.
Provided context may serve as a timeout limiter.
RPCResult
is immediately canceled on drop.
sourcepub fn call_async<T>(&self, to: impl Into<String>, request: T)where
T: Serialize + 'static,
pub fn call_async<T>(&self, to: impl Into<String>, request: T)where T: Serialize + 'static,
call_async
() is equivalent to runtime.spawn(runtime.call(...))
.
see Runtime::call
, Runtime::rpc
.
Examples found in repository?
examples/broadcast.rs (line 47)
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
let msg: Result<Request> = req.body.as_obj();
match msg {
Ok(Request::Read {}) => {
let data = self.snapshot();
let msg = Request::ReadOk { messages: data };
return runtime.reply(req, msg).await;
}
Ok(Request::Broadcast { message: element }) => {
if self.try_add(element) {
info!("messages now {}", element);
for node in runtime.neighbours() {
runtime.call_async(node, Request::Broadcast { message: element });
}
}
return runtime.reply_ok(req).await;
}
Ok(Request::Topology { topology }) => {
let neighbours = topology.get(runtime.node_id()).unwrap();
self.inner.lock().unwrap().t = neighbours.clone();
info!("My neighbors are {:?}", neighbours);
return runtime.reply_ok(req).await;
}
_ => done(runtime, req),
}
}
sourcepub fn node_id(&self) -> &str
pub fn node_id(&self) -> &str
Examples found in repository?
examples/broadcast.rs (line 54)
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
let msg: Result<Request> = req.body.as_obj();
match msg {
Ok(Request::Read {}) => {
let data = self.snapshot();
let msg = Request::ReadOk { messages: data };
return runtime.reply(req, msg).await;
}
Ok(Request::Broadcast { message: element }) => {
if self.try_add(element) {
info!("messages now {}", element);
for node in runtime.neighbours() {
runtime.call_async(node, Request::Broadcast { message: element });
}
}
return runtime.reply_ok(req).await;
}
Ok(Request::Topology { topology }) => {
let neighbours = topology.get(runtime.node_id()).unwrap();
self.inner.lock().unwrap().t = neighbours.clone();
info!("My neighbors are {:?}", neighbours);
return runtime.reply_ok(req).await;
}
_ => done(runtime, req),
}
}
pub fn nodes(&self) -> &[String]
pub fn set_membership_state(&self, state: MembershipState) -> Result<()>
pub async fn done(&self)
pub async fn run_with<R>(&self, input: BufReader<R>) -> Result<()>where R: AsyncRead + Unpin,
pub fn next_msg_id(&self) -> u64
pub fn empty_response() -> Value
pub fn is_client(&self, src: &String) -> bool
pub fn is_from_cluster(&self, src: &String) -> bool
sourcepub fn neighbours(&self) -> impl Iterator<Item = &String>
pub fn neighbours(&self) -> impl Iterator<Item = &String>
All nodes that are not this node.
Examples found in repository?
examples/broadcast.rs (line 46)
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
let msg: Result<Request> = req.body.as_obj();
match msg {
Ok(Request::Read {}) => {
let data = self.snapshot();
let msg = Request::ReadOk { messages: data };
return runtime.reply(req, msg).await;
}
Ok(Request::Broadcast { message: element }) => {
if self.try_add(element) {
info!("messages now {}", element);
for node in runtime.neighbours() {
runtime.call_async(node, Request::Broadcast { message: element });
}
}
return runtime.reply_ok(req).await;
}
Ok(Request::Topology { topology }) => {
let neighbours = topology.get(runtime.node_id()).unwrap();
self.inner.lock().unwrap().t = neighbours.clone();
info!("My neighbors are {:?}", neighbours);
return runtime.reply_ok(req).await;
}
_ => done(runtime, req),
}
}
More examples
examples/g_set.rs (line 62)
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
let msg: Result<Request> = req.body.as_obj();
match msg {
Ok(Request::Read {}) => {
let data = to_seq(&self.s.lock().unwrap());
return runtime.reply(req, Request::ReadOk { value: data }).await;
}
Ok(Request::Add { element }) => {
self.s.lock().unwrap().insert(element);
return runtime.reply(req, Request::AddOk {}).await;
}
Ok(Request::ReplicateOne { element }) => {
self.s.lock().unwrap().insert(element);
return Ok(());
}
Ok(Request::ReplicateFull { value }) => {
let mut s = self.s.lock().unwrap();
for v in value {
s.insert(v);
}
return Ok(());
}
Ok(Request::Init {}) => {
// spawn into tokio (instead of runtime) to not to wait
// until it is completed, as it will never be.
let (r0, h0) = (runtime.clone(), self.clone());
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(5)).await;
debug!("emit replication signal");
let s = h0.s.lock().unwrap();
for n in r0.neighbours() {
let msg = Request::ReplicateFull { value: to_seq(&s) };
drop(r0.send_async(n, msg));
}
}
});
return Ok(());
}
_ => done(runtime, req),
}
}