rf_distributed/platform/
sync.rs1use crate::discovery::nbr_sensors_setup::NbrSensorSetup;
2use crate::discovery::Discovery;
3use crate::mailbox::{AsStates, Mailbox};
4use crate::message::Message;
5use crate::network::{sync::Network, NetworkUpdate};
6use rf_core::context::Context;
7use rf_core::export::Export;
8use rf_core::lang::execution::round;
9use rf_core::vm::round_vm::RoundVM;
10use std::error::Error;
11use std::fmt::Display;
12use std::str::FromStr;
13pub struct SyncRuFiPlatform<M, N, D, S>
14where
15 M: Mailbox,
16 N: Network,
17 D: Discovery,
18 S: NbrSensorSetup,
19{
20 mailbox: M,
21 network: N,
22 context: Context,
23 discovery: D,
24 discovered_nbrs: Vec<i32>,
25 nbr_sensor_setup: S,
26}
27
28impl<M, N, D, S> SyncRuFiPlatform<M, N, D, S>
29where
30 M: Mailbox,
31 N: Network,
32 D: Discovery,
33 S: NbrSensorSetup,
34{
35 pub fn new(mailbox: M, network: N, context: Context, discovery: D, setup: S) -> Self {
36 SyncRuFiPlatform {
37 mailbox,
38 network,
39 context,
40 discovery,
41 discovered_nbrs: vec![],
42 nbr_sensor_setup: setup,
43 }
44 }
45
46 pub fn run_forever<P, A>(mut self, program: P) -> Result<(), Box<dyn Error>>
47 where
48 P: Fn(&mut RoundVM) -> A + Copy,
49 A: Clone + 'static + FromStr + Display,
50 {
51 loop {
52 self.pre_cycle();
53
54 single_cycle(
55 &mut self.mailbox,
56 &mut self.network,
57 &self.nbr_sensor_setup,
58 self.context.clone(),
59 program,
60 )?;
61 }
62 }
63
64 pub fn run_n_cycles<P, A>(mut self, program: P, n: usize) -> Result<(), Box<dyn Error>>
65 where
66 P: Fn(&mut RoundVM) -> A + Copy,
67 A: Clone + 'static + FromStr + Display,
68 {
69 for _ in 0..n {
70 self.pre_cycle();
71
72 single_cycle(
73 &mut self.mailbox,
74 &mut self.network,
75 &self.nbr_sensor_setup,
76 self.context.clone(),
77 program,
78 )?;
79 }
80 Ok(())
81 }
82
83 fn pre_cycle(&mut self) {
85 let nbrs = self.discovery.discover_neighbors();
87 let subscriptions: Vec<i32> = nbrs
89 .clone()
90 .into_iter()
91 .filter(|n| !self.discovered_nbrs.contains(n))
92 .collect();
93 self.discovered_nbrs.extend(subscriptions);
94 }
95}
96
97fn single_cycle<P, A, M, N, S>(
98 mailbox: &mut M,
99 network: &mut N,
100 setup: &S,
101 context: Context,
102 program: P,
103) -> Result<(), Box<dyn Error>>
104where
105 P: Fn(&mut RoundVM) -> A + Copy,
106 A: Clone + 'static + FromStr + Display,
107 M: Mailbox,
108 N: Network,
109 S: NbrSensorSetup,
110{
111 let states = mailbox.messages().as_states();
113
114 let nbr_sensors = setup.nbr_sensor_setup(states.keys().cloned().collect());
116 let context = Context::new(
117 *context.self_id(),
118 context.local_sensors().clone(),
119 nbr_sensors,
120 states,
121 );
122 println!("CONTEXT: {:?}", context);
123 let mut vm = RoundVM::new(context);
124 vm.new_export_stack();
125 let result = round(&mut vm, program);
126 let self_export: Export = vm.export_data().clone();
127 println!("OUTPUT: {}\nEXPORT: {}\n", result, self_export);
128
129 let msg = Message::new(*vm.self_id(), self_export, std::time::SystemTime::now());
131 let msg_ser = serde_json::to_string(&msg).unwrap();
132 network.send(*vm.self_id(), msg_ser)?;
133
134 if let Ok(NetworkUpdate::Update { msg }) = network.receive() {
136 if let Ok(msg) = serde_json::from_str(&msg) {
137 mailbox.enqueue(msg);
138 }
139 }
140 Ok(())
141}