rf_distributed/platform/
asynchronous.rs

1use crate::discovery::nbr_sensors_setup::NbrSensorSetup;
2use crate::discovery::Discovery;
3use crate::mailbox::{AsStates, Mailbox};
4use crate::message::Message;
5use crate::network::{asynchronous::Network, NetworkUpdate};
6use rf_core::context::Context;
7use rf_core::export::Export;
8use rf_core::lang::execution::round;
9use rf_core::vm::round_vm::RoundVM;
10use std::error::Error;
11use std::fmt::Display;
12use std::str::FromStr;
13
14/// This struct represents the platform on which the program is executed
15pub struct RuFiPlatform<M, N, D, S>
16where
17    M: Mailbox,
18    N: Network,
19    D: Discovery,
20    S: NbrSensorSetup,
21{
22    mailbox: M,
23    network: N,
24    context: Context,
25    discovery: D,
26    discovered_nbrs: Vec<i32>,
27    nbr_sensor_setup: S,
28}
29
30impl<M, N, D, S> RuFiPlatform<M, N, D, S>
31where
32    M: Mailbox,
33    N: Network,
34    D: Discovery,
35    S: NbrSensorSetup,
36{
37    /// Creates a new platform
38    pub fn new(mailbox: M, network: N, context: Context, discovery: D, setup: S) -> Self {
39        RuFiPlatform {
40            mailbox,
41            network,
42            context,
43            discovery,
44            discovered_nbrs: vec![],
45            nbr_sensor_setup: setup,
46        }
47    }
48
49    /// Runs indefinitely the program on the platform
50    ///
51    /// # Arguments
52    ///
53    /// * `program` - The aggregate program to be executed
54    ///
55    /// # Generic Arguments
56    ///
57    /// * `P` - The type of the aggregate program, it must be a function that takes a [RoundVM] and returns a [RoundVM] and a result of type `A`
58    /// * `A` - The type of the result of the aggregate program
59    pub async fn run_forever<P, A>(mut self, program: P) -> Result<(), Box<dyn Error>>
60    where
61        P: Fn(&mut RoundVM) -> A + Copy,
62        A: Clone + 'static + FromStr + Display,
63    {
64        loop {
65            self.pre_cycle();
66
67            single_cycle(
68                &mut self.mailbox,
69                &mut self.network,
70                &self.nbr_sensor_setup,
71                self.context.clone(),
72                program,
73            )
74            .await?;
75        }
76    }
77
78    pub async fn run_n_cycles<P, A>(mut self, program: P, n: usize) -> Result<(), Box<dyn Error>>
79    where
80        P: Fn(&mut RoundVM) -> A + Copy,
81        A: Clone + 'static + FromStr + Display,
82    {
83        for _ in 0..n {
84            self.pre_cycle();
85
86            single_cycle(
87                &mut self.mailbox,
88                &mut self.network,
89                &self.nbr_sensor_setup,
90                self.context.clone(),
91                program,
92            )
93            .await?;
94        }
95        Ok(())
96    }
97
98    /// Performs the pre-cycle operations
99    fn pre_cycle(&mut self) {
100        // STEP 1: Discover neighbours
101        let nbrs = self.discovery.discover_neighbors();
102        // STEP 2: Subscribe to the topics of the neighbours
103        let subscriptions: Vec<i32> = nbrs
104            .clone()
105            .into_iter()
106            .filter(|n| !self.discovered_nbrs.contains(n))
107            .collect();
108        self.discovered_nbrs.extend(subscriptions);
109    }
110}
111
112/// Performs a single step of the execution cycle of an aggregate program
113///
114/// # Arguments
115///
116/// * `mailbox` - The mailbox of the device
117/// * `network` - The network through which the device communicates
118/// * `context` - The context of the device
119/// * `program` - The aggregate program to be executed
120///
121/// # Generic Arguments
122///
123/// * `P` - The type of the aggregate program, it must be a function that takes a [RoundVM] and returns a [RoundVM] and a result of type `A`
124/// * `A` - The type of the result of the aggregate program
125///
126/// # Returns
127///
128/// * `Result<(), Box<dyn Error>>` - The result of the execution
129async fn single_cycle<P, A, M, N, S>(
130    mailbox: &mut M,
131    network: &mut N,
132    setup: &S,
133    context: Context,
134    program: P,
135) -> Result<(), Box<dyn Error>>
136where
137    P: Fn(&mut RoundVM) -> A,
138    A: Clone + 'static + FromStr + Display,
139    M: Mailbox,
140    N: Network,
141    S: NbrSensorSetup,
142{
143    //STEP 3: Retrieve the neighbouring exports from the mailbox
144    let states = mailbox.messages().as_states();
145
146    //STEP 4: Execute a round
147    let nbr_sensors = setup.nbr_sensor_setup(states.keys().cloned().collect());
148    let context = Context::new(
149        *context.self_id(),
150        context.local_sensors().clone(),
151        nbr_sensors,
152        states,
153    );
154    println!("CONTEXT: {:?}", context);
155    let mut vm = RoundVM::new(context);
156    vm.new_export_stack();
157    let result = round(&mut vm, program);
158    let self_export: Export = vm.export_data().clone();
159    println!("OUTPUT: {}\nEXPORT: {}\n", result, self_export);
160
161    //STEP 5: Publish the export
162    let msg = Message::new(*vm.self_id(), self_export, std::time::SystemTime::now());
163    let msg_ser = serde_json::to_string(&msg).unwrap();
164    network.send(*vm.self_id(), msg_ser).await?;
165
166    //STEP 6: Receive the neighbouring exports from the network
167    if let Ok(NetworkUpdate::Update { msg }) = network.receive().await {
168        if let Ok(msg) = serde_json::from_str(&msg) {
169            mailbox.enqueue(msg);
170        }
171    }
172    Ok(())
173}