1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
use std::error::Error;
use std::fmt::Display;
use std::str::FromStr;
use rf_core::context::Context;
use rf_core::export::Export;
use rf_core::lang::execution::round;
use rf_core::vm::round_vm::RoundVM;
use crate::discovery::Discovery;
use crate::discovery::nbr_sensors_setup::NbrSensorSetup;
use crate::mailbox::{AsStates, Mailbox};
use crate::message::Message;
use crate::network::{
    sync::Network,
    NetworkUpdate,
};
pub struct SyncRuFiPlatform {
    mailbox: Box<dyn Mailbox>,
    network: Box<dyn Network>,
    context: Context,
    discovery: Box<dyn Discovery>,
    discovered_nbrs: Vec<i32>,
    nbr_sensor_setup: Box<dyn NbrSensorSetup>,
}

impl SyncRuFiPlatform {
    pub fn new(
        mailbox: Box<dyn Mailbox>,
        network: Box<dyn Network>,
        context: Context,
        discovery: Box<dyn Discovery>,
        setup: Box<dyn NbrSensorSetup>,
    ) -> Self {
        SyncRuFiPlatform {
            mailbox,
            network,
            context,
            discovery,
            discovered_nbrs: vec![],
            nbr_sensor_setup: setup,
        }
    }

    pub fn run_forever<P, A>(mut self, program: P) -> Result<(), Box<dyn Error>>
        where
            P: Fn(RoundVM) -> (RoundVM, A) + Copy,
            A: Clone + 'static + FromStr + Display,
    {
        loop {
            // STEP 1: Discover neighbours
            let nbrs = self.discovery.discover_neighbors();
            // STEP 2: Subscribe to the topics of the neighbours
            let subscriptions: Vec<i32> = nbrs
                .clone()
                .into_iter()
                .filter(|n| !self.discovered_nbrs.contains(n))
                .collect();
            self.discovered_nbrs.extend(subscriptions);

            single_cycle(
                &mut self.mailbox,
                &mut self.network,
                &self.nbr_sensor_setup,
                self.context.clone(),
                program,
            )?;
        }
    }

}

fn single_cycle<P, A>(
    mailbox: &mut Box<dyn Mailbox>,
    network: &mut Box<dyn Network>,
    setup: &Box<dyn NbrSensorSetup>,
    context: Context,
    program: P,
) -> Result<(), Box<dyn Error>>
    where
        P: Fn(RoundVM) -> (RoundVM, A),
        A: Clone + 'static + FromStr + Display,
{
    //STEP 3: Retrieve the neighbouring exports from the mailbox
    let states = mailbox.messages().as_states();

    //STEP 4: Execute a round
    let nbr_sensors = setup.nbr_sensor_setup(states.keys().cloned().collect());
    let context = Context::new(
        *context.self_id(),
        context.local_sensors().clone(),
        nbr_sensors,
        states,
    );
    println!("CONTEXT: {:?}", context);
    let mut vm = RoundVM::new(context);
    vm.new_export_stack();
    let (mut vm_, result) = round(vm, program);
    let self_export: Export = vm_.export_data().clone();
    println!("OUTPUT: {}\nEXPORT: {}\n", result, self_export);

    //STEP 5: Publish the export
    let msg = Message::new(
        *vm_.self_id(),
        self_export,
        std::time::SystemTime::now(),
    );
    let msg_ser = serde_json::to_string(&msg).unwrap();
    network.send(*vm_.self_id(), msg_ser)?;

    //STEP 6: Receive the neighbouring exports from the network
    if let Ok(NetworkUpdate::Update { msg }) = network.receive() {
        let msg: Message = serde_json::from_str(&msg).unwrap();
        mailbox.enqueue(msg);
    }
    Ok(())
}