1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
use crate::discovery::nbr_sensors_setup::NbrSensorSetup;
use crate::discovery::Discovery;
use crate::mailbox::{AsStates, Mailbox};
use crate::message::Message;
use crate::network::{
    asynchronous::Network, 
    NetworkUpdate,
};
use rf_core::context::Context;
use rf_core::export::Export;
use rf_core::lang::execution::round;
use rf_core::vm::round_vm::RoundVM;
use std::error::Error;
use std::fmt::Display;
use std::str::FromStr;

/// This struct represents the platform on which the program is executed
pub struct RuFiPlatform {
    mailbox: Box<dyn Mailbox>,
    network: Box<dyn Network>,
    context: Context,
    discovery: Box<dyn Discovery>,
    discovered_nbrs: Vec<i32>,
    nbr_sensor_setup: Box<dyn NbrSensorSetup>,
}

impl RuFiPlatform {
    /// Creates a new platform
    pub fn new(
        mailbox: Box<dyn Mailbox>,
        network: Box<dyn Network>,
        context: Context,
        discovery: Box<dyn Discovery>,
        setup: Box<dyn NbrSensorSetup>,
    ) -> Self {
        RuFiPlatform {
            mailbox,
            network,
            context,
            discovery,
            discovered_nbrs: vec![],
            nbr_sensor_setup: setup,
        }
    }

    /// Runs indefinitely the program on the platform
    ///
    /// # Arguments
    ///
    /// * `program` - The aggregate program to be executed
    ///
    /// # Generic Arguments
    ///
    /// * `P` - The type of the aggregate program, it must be a function that takes a [RoundVM] and returns a [RoundVM] and a result of type `A`
    /// * `A` - The type of the result of the aggregate program
    pub async fn run_forever<P, A>(mut self, program: P) -> Result<(), Box<dyn Error>>
        where
            P: Fn(RoundVM) -> (RoundVM, A) + Copy,
            A: Clone + 'static + FromStr + Display,
    {
        loop {
            // STEP 1: Discover neighbours
            let nbrs = self.discovery.discover_neighbors();
            // STEP 2: Subscribe to the topics of the neighbours
            let subscriptions: Vec<i32> = nbrs
                .clone()
                .into_iter()
                .filter(|n| !self.discovered_nbrs.contains(n))
                .collect();
            self.discovered_nbrs.extend(subscriptions);

            single_cycle(
                &mut self.mailbox,
                &mut self.network,
                &self.nbr_sensor_setup,
                self.context.clone(),
                program,
            )
                .await?;
        }
    }
}

/// Performs a single step of the execution cycle of an aggregate program
///
/// # Arguments
///
/// * `mailbox` - The mailbox of the device
/// * `network` - The network through which the device communicates
/// * `context` - The context of the device
/// * `program` - The aggregate program to be executed
///
/// # Generic Arguments
///
/// * `P` - The type of the aggregate program, it must be a function that takes a [RoundVM] and returns a [RoundVM] and a result of type `A`
/// * `A` - The type of the result of the aggregate program
///
/// # Returns
///
/// * `Result<(), Box<dyn Error>>` - The result of the execution
async fn single_cycle<P, A>(
    mailbox: &mut Box<dyn Mailbox>,
    network: &mut Box<dyn Network>,
    setup: &Box<dyn NbrSensorSetup>,
    context: Context,
    program: P,
) -> Result<(), Box<dyn Error>>
    where
        P: Fn(RoundVM) -> (RoundVM, A),
        A: Clone + 'static + FromStr + Display,
{
    //STEP 3: Retrieve the neighbouring exports from the mailbox
    let states = mailbox.messages().as_states();

    //STEP 4: Execute a round
    let nbr_sensors = setup.nbr_sensor_setup(states.keys().cloned().collect());
    let context = Context::new(
        *context.self_id(),
        context.local_sensors().clone(),
        nbr_sensors,
        states,
    );
    println!("CONTEXT: {:?}", context);
    let mut vm = RoundVM::new(context);
    vm.new_export_stack();
    let (mut vm_, result) = round(vm, program);
    let self_export: Export = vm_.export_data().clone();
    println!("OUTPUT: {}\nEXPORT: {}\n", result, self_export);

    //STEP 5: Publish the export
    let msg = Message::new(
        *vm_.self_id(),
        self_export,
        std::time::SystemTime::now(),
    );
    let msg_ser = serde_json::to_string(&msg).unwrap();
    network.send(*vm_.self_id(), msg_ser).await?;

    //STEP 6: Receive the neighbouring exports from the network
    if let Ok(NetworkUpdate::Update { msg }) = network.receive().await {
        let msg: Message = serde_json::from_str(&msg).unwrap();
        mailbox.enqueue(msg);
    }
    Ok(())
}