rf_distributed/platform/
asynchronous.rs1use crate::discovery::nbr_sensors_setup::NbrSensorSetup;
2use crate::discovery::Discovery;
3use crate::mailbox::{AsStates, Mailbox};
4use crate::message::Message;
5use crate::network::{asynchronous::Network, NetworkUpdate};
6use rf_core::context::Context;
7use rf_core::export::Export;
8use rf_core::lang::execution::round;
9use rf_core::vm::round_vm::RoundVM;
10use std::error::Error;
11use std::fmt::Display;
12use std::str::FromStr;
13
14pub struct RuFiPlatform<M, N, D, S>
16where
17 M: Mailbox,
18 N: Network,
19 D: Discovery,
20 S: NbrSensorSetup,
21{
22 mailbox: M,
23 network: N,
24 context: Context,
25 discovery: D,
26 discovered_nbrs: Vec<i32>,
27 nbr_sensor_setup: S,
28}
29
30impl<M, N, D, S> RuFiPlatform<M, N, D, S>
31where
32 M: Mailbox,
33 N: Network,
34 D: Discovery,
35 S: NbrSensorSetup,
36{
37 pub fn new(mailbox: M, network: N, context: Context, discovery: D, setup: S) -> Self {
39 RuFiPlatform {
40 mailbox,
41 network,
42 context,
43 discovery,
44 discovered_nbrs: vec![],
45 nbr_sensor_setup: setup,
46 }
47 }
48
49 pub async fn run_forever<P, A>(mut self, program: P) -> Result<(), Box<dyn Error>>
60 where
61 P: Fn(&mut RoundVM) -> A + Copy,
62 A: Clone + 'static + FromStr + Display,
63 {
64 loop {
65 self.pre_cycle();
66
67 single_cycle(
68 &mut self.mailbox,
69 &mut self.network,
70 &self.nbr_sensor_setup,
71 self.context.clone(),
72 program,
73 )
74 .await?;
75 }
76 }
77
78 pub async fn run_n_cycles<P, A>(mut self, program: P, n: usize) -> Result<(), Box<dyn Error>>
79 where
80 P: Fn(&mut RoundVM) -> A + Copy,
81 A: Clone + 'static + FromStr + Display,
82 {
83 for _ in 0..n {
84 self.pre_cycle();
85
86 single_cycle(
87 &mut self.mailbox,
88 &mut self.network,
89 &self.nbr_sensor_setup,
90 self.context.clone(),
91 program,
92 )
93 .await?;
94 }
95 Ok(())
96 }
97
98 fn pre_cycle(&mut self) {
100 let nbrs = self.discovery.discover_neighbors();
102 let subscriptions: Vec<i32> = nbrs
104 .clone()
105 .into_iter()
106 .filter(|n| !self.discovered_nbrs.contains(n))
107 .collect();
108 self.discovered_nbrs.extend(subscriptions);
109 }
110}
111
112async fn single_cycle<P, A, M, N, S>(
130 mailbox: &mut M,
131 network: &mut N,
132 setup: &S,
133 context: Context,
134 program: P,
135) -> Result<(), Box<dyn Error>>
136where
137 P: Fn(&mut RoundVM) -> A,
138 A: Clone + 'static + FromStr + Display,
139 M: Mailbox,
140 N: Network,
141 S: NbrSensorSetup,
142{
143 let states = mailbox.messages().as_states();
145
146 let nbr_sensors = setup.nbr_sensor_setup(states.keys().cloned().collect());
148 let context = Context::new(
149 *context.self_id(),
150 context.local_sensors().clone(),
151 nbr_sensors,
152 states,
153 );
154 println!("CONTEXT: {:?}", context);
155 let mut vm = RoundVM::new(context);
156 vm.new_export_stack();
157 let result = round(&mut vm, program);
158 let self_export: Export = vm.export_data().clone();
159 println!("OUTPUT: {}\nEXPORT: {}\n", result, self_export);
160
161 let msg = Message::new(*vm.self_id(), self_export, std::time::SystemTime::now());
163 let msg_ser = serde_json::to_string(&msg).unwrap();
164 network.send(*vm.self_id(), msg_ser).await?;
165
166 if let Ok(NetworkUpdate::Update { msg }) = network.receive().await {
168 if let Ok(msg) = serde_json::from_str(&msg) {
169 mailbox.enqueue(msg);
170 }
171 }
172 Ok(())
173}