rf_distributed/platform/
sync.rs

1use crate::discovery::nbr_sensors_setup::NbrSensorSetup;
2use crate::discovery::Discovery;
3use crate::mailbox::{AsStates, Mailbox};
4use crate::message::Message;
5use crate::network::{sync::Network, NetworkUpdate};
6use rf_core::context::Context;
7use rf_core::export::Export;
8use rf_core::lang::execution::round;
9use rf_core::vm::round_vm::RoundVM;
10use std::error::Error;
11use std::fmt::Display;
12use std::str::FromStr;
13pub struct SyncRuFiPlatform<M, N, D, S>
14where
15    M: Mailbox,
16    N: Network,
17    D: Discovery,
18    S: NbrSensorSetup,
19{
20    mailbox: M,
21    network: N,
22    context: Context,
23    discovery: D,
24    discovered_nbrs: Vec<i32>,
25    nbr_sensor_setup: S,
26}
27
28impl<M, N, D, S> SyncRuFiPlatform<M, N, D, S>
29where
30    M: Mailbox,
31    N: Network,
32    D: Discovery,
33    S: NbrSensorSetup,
34{
35    pub fn new(mailbox: M, network: N, context: Context, discovery: D, setup: S) -> Self {
36        SyncRuFiPlatform {
37            mailbox,
38            network,
39            context,
40            discovery,
41            discovered_nbrs: vec![],
42            nbr_sensor_setup: setup,
43        }
44    }
45
46    pub fn run_forever<P, A>(mut self, program: P) -> Result<(), Box<dyn Error>>
47    where
48        P: Fn(&mut RoundVM) -> A + Copy,
49        A: Clone + 'static + FromStr + Display,
50    {
51        loop {
52            self.pre_cycle();
53
54            single_cycle(
55                &mut self.mailbox,
56                &mut self.network,
57                &self.nbr_sensor_setup,
58                self.context.clone(),
59                program,
60            )?;
61        }
62    }
63
64    pub fn run_n_cycles<P, A>(mut self, program: P, n: usize) -> Result<(), Box<dyn Error>>
65        where
66            P: Fn(&mut RoundVM) -> A + Copy,
67            A: Clone + 'static + FromStr + Display,
68    {
69        for _ in 0..n {
70            self.pre_cycle();
71
72            single_cycle(
73                &mut self.mailbox,
74                &mut self.network,
75                &self.nbr_sensor_setup,
76                self.context.clone(),
77                program,
78            )?;
79        }
80        Ok(())
81    }
82
83    /// Performs the pre-cycle operations
84    fn pre_cycle(&mut self) {
85        // STEP 1: Discover neighbours
86        let nbrs = self.discovery.discover_neighbors();
87        // STEP 2: Subscribe to the topics of the neighbours
88        let subscriptions: Vec<i32> = nbrs
89            .clone()
90            .into_iter()
91            .filter(|n| !self.discovered_nbrs.contains(n))
92            .collect();
93        self.discovered_nbrs.extend(subscriptions);
94    }
95}
96
97fn single_cycle<P, A, M, N, S>(
98    mailbox: &mut M,
99    network: &mut N,
100    setup: &S,
101    context: Context,
102    program: P,
103) -> Result<(), Box<dyn Error>>
104where
105    P: Fn(&mut RoundVM) -> A + Copy,
106    A: Clone + 'static + FromStr + Display,
107    M: Mailbox,
108    N: Network,
109    S: NbrSensorSetup,
110{
111    //STEP 3: Retrieve the neighbouring exports from the mailbox
112    let states = mailbox.messages().as_states();
113
114    //STEP 4: Execute a round
115    let nbr_sensors = setup.nbr_sensor_setup(states.keys().cloned().collect());
116    let context = Context::new(
117        *context.self_id(),
118        context.local_sensors().clone(),
119        nbr_sensors,
120        states,
121    );
122    println!("CONTEXT: {:?}", context);
123    let mut vm = RoundVM::new(context);
124    vm.new_export_stack();
125    let result = round(&mut vm, program);
126    let self_export: Export = vm.export_data().clone();
127    println!("OUTPUT: {}\nEXPORT: {}\n", result, self_export);
128
129    //STEP 5: Publish the export
130    let msg = Message::new(*vm.self_id(), self_export, std::time::SystemTime::now());
131    let msg_ser = serde_json::to_string(&msg).unwrap();
132    network.send(*vm.self_id(), msg_ser)?;
133
134    //STEP 6: Receive the neighbouring exports from the network
135    if let Ok(NetworkUpdate::Update { msg }) = network.receive() {
136        if let Ok(msg) = serde_json::from_str(&msg) {
137            mailbox.enqueue(msg);
138        }
139    }
140    Ok(())
141}