1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
//! A state machine initializer.

use thiserror::Error;
#[cfg(feature = "model-persistence")]
use tracing::{debug, info};

#[cfg(feature = "model-persistence")]
use crate::settings::RestoreSettings;
use crate::{
    settings::{MaskSettings, ModelSettings, PetSettings},
    state_machine::{
        coordinator::CoordinatorState,
        events::{EventPublisher, EventSubscriber, ModelUpdate},
        phases::{Idle, PhaseName, PhaseState, Shared},
        requests::{RequestReceiver, RequestSender},
        StateMachine,
    },
    storage::{CoordinatorStorage, ModelStorage, StorageError, Store},
};

#[cfg(feature = "model-persistence")]
use xaynet_core::mask::Model;

type StateMachineInitializationResult<T> = Result<T, StateMachineInitializationError>;

/// Error that can occur during the initialization of the [`StateMachine`].
#[derive(Debug, Error)]
pub enum StateMachineInitializationError {
    #[error("initializing crypto library failed")]
    CryptoInit,
    #[error("fetching coordinator state failed: {0}")]
    FetchCoordinatorState(StorageError),
    #[error("deleting coordinator data failed: {0}")]
    DeleteCoordinatorData(StorageError),
    #[error("fetching latest global model id failed: {0}")]
    FetchLatestGlobalModelId(StorageError),
    #[error("fetching global model failed: {0}")]
    FetchGlobalModel(StorageError),
    #[error("{0}")]
    GlobalModelUnavailable(String),
    #[error("{0}")]
    GlobalModelInvalid(String),
}

/// The state machine initializer that initializes a new state machine.
pub struct StateMachineInitializer<C, M>
where
    C: CoordinatorStorage,
    M: ModelStorage,
{
    pet_settings: PetSettings,
    mask_settings: MaskSettings,
    model_settings: ModelSettings,
    #[cfg(feature = "model-persistence")]
    restore_settings: RestoreSettings,

    store: Store<C, M>,
}

impl<C, M> StateMachineInitializer<C, M>
where
    C: CoordinatorStorage,
    M: ModelStorage,
{
    /// Creates a new [`StateMachineInitializer`].
    pub fn new(
        pet_settings: PetSettings,
        mask_settings: MaskSettings,
        model_settings: ModelSettings,
        #[cfg(feature = "model-persistence")] restore_settings: RestoreSettings,
        store: Store<C, M>,
    ) -> Self {
        Self {
            pet_settings,
            mask_settings,
            model_settings,
            #[cfg(feature = "model-persistence")]
            restore_settings,
            store,
        }
    }

    #[cfg(not(feature = "model-persistence"))]
    /// Initializes a new [`StateMachine`] with the given settings.
    pub async fn init(
        mut self,
    ) -> StateMachineInitializationResult<(StateMachine<C, M>, RequestSender, EventSubscriber)>
    {
        // crucial: init must be called before anything else in this module
        sodiumoxide::init().or(Err(StateMachineInitializationError::CryptoInit))?;

        let (coordinator_state, global_model) = { self.from_settings().await? };
        Ok(self.init_state_machine(coordinator_state, global_model))
    }

    // Creates a new [`CoordinatorState`] from the given settings and deletes
    // all coordinator data. Should only be called for the first start
    // or if we need to perform reset.
    pub(in crate::state_machine) async fn from_settings(
        &mut self,
    ) -> StateMachineInitializationResult<(CoordinatorState, ModelUpdate)> {
        self.store
            .delete_coordinator_data()
            .await
            .map_err(StateMachineInitializationError::DeleteCoordinatorData)?;
        Ok((
            CoordinatorState::new(
                self.pet_settings,
                self.mask_settings,
                self.model_settings.clone(),
            ),
            ModelUpdate::Invalidate,
        ))
    }

    // Initializes a new [`StateMachine`] with its components.
    fn init_state_machine(
        self,
        coordinator_state: CoordinatorState,
        global_model: ModelUpdate,
    ) -> (StateMachine<C, M>, RequestSender, EventSubscriber) {
        let (event_publisher, event_subscriber) = EventPublisher::init(
            coordinator_state.round_id,
            coordinator_state.keys.clone(),
            coordinator_state.round_params.clone(),
            PhaseName::Idle,
            global_model,
        );

        let (request_rx, request_tx) = RequestReceiver::new();

        let shared = Shared::new(coordinator_state, event_publisher, request_rx, self.store);

        let state_machine = StateMachine::from(PhaseState::<Idle, _, _>::new(shared));
        (state_machine, request_tx, event_subscriber)
    }
}

#[cfg(feature = "model-persistence")]
#[cfg_attr(docsrs, doc(cfg(feature = "model-persistence")))]
impl<C, M> StateMachineInitializer<C, M>
where
    C: CoordinatorStorage,
    M: ModelStorage,
{
    /// Initializes a new [`StateMachine`] by trying to restore the previous coordinator state
    /// along with the latest global model. After a successful initialization, the state machine
    /// always starts from a new round. This means that the round id is increased by one.
    /// If the state machine is reset during the initialization, the state machine starts
    /// with the round id `1`.
    ///
    /// # Behavior
    /// ![](https://mermaid.ink/svg/eyJjb2RlIjoic2VxdWVuY2VEaWFncmFtXG4gICAgYWx0IHJlc3RvcmUuZW5hYmxlID0gZmFsc2VcbiAgICAgICAgQ29vcmRpbmF0b3ItPj4rUmVkaXM6IGZsdXNoIGRiXG4gICAgICAgIE5vdGUgb3ZlciBDb29yZGluYXRvcixSZWRpczogc3RhcnQgZnJvbSBzZXR0aW5nc1xuICAgIGVsc2VcbiAgICAgICAgQ29vcmRpbmF0b3ItPj4rUmVkaXM6IGdldCBzdGF0ZVxuICAgICAgICBSZWRpcy0tPj4tQ29vcmRpbmF0b3I6IHN0YXRlXG4gICAgICAgIGFsdCBzdGF0ZSBub24tZXhpc3RlbnRcbiAgICAgICAgICAgIENvb3JkaW5hdG9yLT4-K1JlZGlzOiBmbHVzaCBkYlxuICAgICAgICAgICAgTm90ZSBvdmVyIENvb3JkaW5hdG9yLFJlZGlzOiBzdGFydCBmcm9tIHNldHRpbmdzXG4gICAgICAgIGVsc2Ugc3RhdGUgZXhpc3RcbiAgICAgICAgICAgIENvb3JkaW5hdG9yLT4-K1JlZGlzOiBnZXQgbGF0ZXN0IGdsb2JhbCBtb2RlbCBpZFxuICAgICAgICAgICAgUmVkaXMtLT4-LUNvb3JkaW5hdG9yOiBnbG9iYWwgbW9kZWwgaWRcbiAgICAgICAgICAgIGFsdCBnbG9iYWwgbW9kZWwgaWQgbm9uLWV4aXN0ZW50XG4gICAgICAgICAgICAgICAgTm90ZSBvdmVyIENvb3JkaW5hdG9yLFMzOiByZXN0b3JlIGNvb3JkaW5hdG9yIHdpdGggbGF0ZXN0IHN0YXRlIGJ1dCB3aXRob3V0IGEgZ2xvYmFsIG1vZGVsXG4gICAgICAgICAgICBlbHNlIGdsb2JhbCBtb2RlbCBpZCBleGlzdFxuICAgICAgICAgICAgICBDb29yZGluYXRvci0-PitTMzogZ2V0IGdsb2JhbCBtb2RlbFxuICAgICAgICAgICAgICBTMy0tPj4tQ29vcmRpbmF0b3I6IGdsb2JhbCBtb2RlbFxuICAgICAgICAgICAgICBhbHQgZ2xvYmFsIG1vZGVsIG5vbi1leGlzdGVudFxuICAgICAgICAgICAgICAgIE5vdGUgb3ZlciBDb29yZGluYXRvcixTMzogZXhpdCB3aXRoIGVycm9yXG4gICAgICAgICAgICAgIGVsc2UgZ2xvYmFsIG1vZGVsIGV4aXN0XG4gICAgICAgICAgICAgICAgTm90ZSBvdmVyIENvb3JkaW5hdG9yLFMzOiByZXN0b3JlIGNvb3JkaW5hdG9yIHdpdGggbGF0ZXN0IHN0YXRlIGFuZCBsYXRlc3QgZ2xvYmFsIG1vZGVsXG4gICAgICAgICAgICAgIGVuZFxuICAgICAgICAgICAgZW5kXG4gICAgICAgICAgZW5kXG4gICAgICAgIGVuZCIsIm1lcm1haWQiOnsidGhlbWUiOiJkZWZhdWx0IiwidGhlbWVWYXJpYWJsZXMiOnsiYmFja2dyb3VuZCI6IndoaXRlIiwicHJpbWFyeUNvbG9yIjoiI0VDRUNGRiIsInNlY29uZGFyeUNvbG9yIjoiI2ZmZmZkZSIsInRlcnRpYXJ5Q29sb3IiOiJoc2woODAsIDEwMCUsIDk2LjI3NDUwOTgwMzklKSIsInByaW1hcnlCb3JkZXJDb2xvciI6ImhzbCgyNDAsIDYwJSwgODYuMjc0NTA5ODAzOSUpIiwic2Vjb25kYXJ5Qm9yZGVyQ29sb3IiOiJoc2woNjAsIDYwJSwgODMuNTI5NDExNzY0NyUpIiwidGVydGlhcnlCb3JkZXJDb2xvciI6ImhzbCg4MCwgNjAlLCA4Ni4yNzQ1MDk4MDM5JSkiLCJwcmltYXJ5VGV4dENvbG9yIjoiIzEzMTMwMCIsInNlY29uZGFyeVRleHRDb2xvciI6IiMwMDAwMjEiLCJ0ZXJ0aWFyeVRleHRDb2xvciI6InJnYig5LjUwMDAwMDAwMDEsIDkuNTAwMDAwMDAwMSwgOS41MDAwMDAwMDAxKSIsImxpbmVDb2xvciI6IiMzMzMzMzMiLCJ0ZXh0Q29sb3IiOiIjMzMzIiwibWFpbkJrZyI6IiNFQ0VDRkYiLCJzZWNvbmRCa2ciOiIjZmZmZmRlIiwiYm9yZGVyMSI6IiM5MzcwREIiLCJib3JkZXIyIjoiI2FhYWEzMyIsImFycm93aGVhZENvbG9yIjoiIzMzMzMzMyIsImZvbnRGYW1pbHkiOiJcInRyZWJ1Y2hldCBtc1wiLCB2ZXJkYW5hLCBhcmlhbCIsImZvbnRTaXplIjoiMTZweCIsImxhYmVsQmFja2dyb3VuZCI6IiNlOGU4ZTgiLCJub2RlQmtnIjoiI0VDRUNGRiIsIm5vZGVCb3JkZXIiOiIjOTM3MERCIiwiY2x1c3RlckJrZyI6IiNmZmZmZGUiLCJjbHVzdGVyQm9yZGVyIjoiI2FhYWEzMyIsImRlZmF1bHRMaW5rQ29sb3IiOiIjMzMzMzMzIiwidGl0bGVDb2xvciI6IiMzMzMiLCJlZGdlTGFiZWxCYWNrZ3JvdW5kIjoiI2U4ZThlOCIsImFjdG9yQm9yZGVyIjoiaHNsKDI1OS42MjYxNjgyMjQzLCA1OS43NzY1MzYzMTI4JSwgODcuOTAxOTYwNzg0MyUpIiwiYWN0b3JCa2ciOiIjRUNFQ0ZGIiwiYWN0b3JUZXh0Q29sb3IiOiJibGFjayIsImFjdG9yTGluZUNvbG9yIjoiZ3JleSIsInNpZ25hbENvbG9yIjoiIzMzMyIsInNpZ25hbFRleHRDb2xvciI6IiMzMzMiLCJsYWJlbEJveEJrZ0NvbG9yIjoiI0VDRUNGRiIsImxhYmVsQm94Qm9yZGVyQ29sb3IiOiJoc2woMjU5LjYyNjE2ODIyNDMsIDU5Ljc3NjUzNjMxMjglLCA4Ny45MDE5NjA3ODQzJSkiLCJsYWJlbFRleHRDb2xvciI6ImJsYWNrIiwibG9vcFRleHRDb2xvciI6ImJsYWNrIiwibm90ZUJvcmRlckNvbG9yIjoiI2FhYWEzMyIsIm5vdGVCa2dDb2xvciI6IiNmZmY1YWQiLCJub3RlVGV4dENvbG9yIjoiYmxhY2siLCJhY3RpdmF0aW9uQm9yZGVyQ29sb3IiOiIjNjY2IiwiYWN0aXZhdGlvbkJrZ0NvbG9yIjoiI2Y0ZjRmNCIsInNlcXVlbmNlTnVtYmVyQ29sb3IiOiJ3aGl0ZSIsInNlY3Rpb25Ca2dDb2xvciI6InJnYmEoMTAyLCAxMDIsIDI1NSwgMC40OSkiLCJhbHRTZWN0aW9uQmtnQ29sb3IiOiJ3aGl0ZSIsInNlY3Rpb25Ca2dDb2xvcjIiOiIjZmZmNDAwIiwidGFza0JvcmRlckNvbG9yIjoiIzUzNGZiYyIsInRhc2tCa2dDb2xvciI6IiM4YTkwZGQiLCJ0YXNrVGV4dExpZ2h0Q29sb3IiOiJ3aGl0ZSIsInRhc2tUZXh0Q29sb3IiOiJ3aGl0ZSIsInRhc2tUZXh0RGFya0NvbG9yIjoiYmxhY2siLCJ0YXNrVGV4dE91dHNpZGVDb2xvciI6ImJsYWNrIiwidGFza1RleHRDbGlja2FibGVDb2xvciI6IiMwMDMxNjMiLCJhY3RpdmVUYXNrQm9yZGVyQ29sb3IiOiIjNTM0ZmJjIiwiYWN0aXZlVGFza0JrZ0NvbG9yIjoiI2JmYzdmZiIsImdyaWRDb2xvciI6ImxpZ2h0Z3JleSIsImRvbmVUYXNrQmtnQ29sb3IiOiJsaWdodGdyZXkiLCJkb25lVGFza0JvcmRlckNvbG9yIjoiZ3JleSIsImNyaXRCb3JkZXJDb2xvciI6IiNmZjg4ODgiLCJjcml0QmtnQ29sb3IiOiJyZWQiLCJ0b2RheUxpbmVDb2xvciI6InJlZCIsImxhYmVsQ29sb3IiOiJibGFjayIsImVycm9yQmtnQ29sb3IiOiIjNTUyMjIyIiwiZXJyb3JUZXh0Q29sb3IiOiIjNTUyMjIyIiwiY2xhc3NUZXh0IjoiIzEzMTMwMCIsImZpbGxUeXBlMCI6IiNFQ0VDRkYiLCJmaWxsVHlwZTEiOiIjZmZmZmRlIiwiZmlsbFR5cGUyIjoiaHNsKDMwNCwgMTAwJSwgOTYuMjc0NTA5ODAzOSUpIiwiZmlsbFR5cGUzIjoiaHNsKDEyNCwgMTAwJSwgOTMuNTI5NDExNzY0NyUpIiwiZmlsbFR5cGU0IjoiaHNsKDE3NiwgMTAwJSwgOTYuMjc0NTA5ODAzOSUpIiwiZmlsbFR5cGU1IjoiaHNsKC00LCAxMDAlLCA5My41Mjk0MTE3NjQ3JSkiLCJmaWxsVHlwZTYiOiJoc2woOCwgMTAwJSwgOTYuMjc0NTA5ODAzOSUpIiwiZmlsbFR5cGU3IjoiaHNsKDE4OCwgMTAwJSwgOTMuNTI5NDExNzY0NyUpIn19LCJ1cGRhdGVFZGl0b3IiOmZhbHNlfQ)
    ///
    /// - If the [`RestoreSettings.enable`] flag is set to `false`, the current coordinator
    ///   state will be reset and a new [`StateMachine`] is created with the given settings.
    /// - If no coordinator state exists, the current coordinator state will be reset and a new
    ///   [`StateMachine`] is created with the given settings.
    /// - If a coordinator state exists but no global model has been created so far, the
    ///   [`StateMachine`] will be restored with the coordinator state but without a global model.
    /// - If a coordinator state and a global model exists, the [`StateMachine`] will be restored
    ///   with the coordinator state and the global model.
    /// - If a global model has been created but does not exists, the initialization will fail with
    ///   [`StateMachineInitializationError::GlobalModelUnavailable`].
    /// - If a global model exists but its properties do not match the coordinator model settings,
    ///   the initialization will fail with [`StateMachineInitializationError::GlobalModelInvalid`].
    /// - Any network error will cause the initialization to fail.
    pub async fn init(
        mut self,
    ) -> StateMachineInitializationResult<(StateMachine<C, M>, RequestSender, EventSubscriber)>
    {
        // crucial: init must be called before anything else in this module
        sodiumoxide::init().or(Err(StateMachineInitializationError::CryptoInit))?;

        let (coordinator_state, global_model) = if self.restore_settings.enable {
            self.from_previous_state().await?
        } else {
            info!("restoring coordinator state is disabled");
            info!("initialize state machine from settings");
            self.from_settings().await?
        };

        Ok(self.init_state_machine(coordinator_state, global_model))
    }

    // see [`StateMachineInitializer::init`]
    async fn from_previous_state(
        &mut self,
    ) -> StateMachineInitializationResult<(CoordinatorState, ModelUpdate)> {
        let (coordinator_state, global_model) = if let Some(coordinator_state) = self
            .store
            .coordinator_state()
            .await
            .map_err(StateMachineInitializationError::FetchCoordinatorState)?
        {
            self.try_restore_state(coordinator_state).await?
        } else {
            // no coordinator state available seems to be a fresh start
            self.from_settings().await?
        };

        Ok((coordinator_state, global_model))
    }

    // see [`StateMachineInitializer::init`]
    async fn try_restore_state(
        &mut self,
        coordinator_state: CoordinatorState,
    ) -> StateMachineInitializationResult<(CoordinatorState, ModelUpdate)> {
        let global_model_id = match self
            .store
            .latest_global_model_id()
            .await
            .map_err(StateMachineInitializationError::FetchLatestGlobalModelId)?
        {
            // the state machine was shut down before completing a round
            // we cannot use the round_id here because we increment the round_id after each restart
            // that means even if the round id is larger than one, it doesn't mean that a
            // round has ever been completed
            None => {
                debug!("apparently no round has been completed yet");
                debug!("restore coordinator without a global model");
                return Ok((coordinator_state, ModelUpdate::Invalidate));
            }
            Some(global_model_id) => global_model_id,
        };

        let global_model = self
            .load_global_model(&coordinator_state, &global_model_id)
            .await?;

        debug!(
            "restore coordinator with global model id: {}",
            global_model_id
        );
        Ok((
            coordinator_state,
            ModelUpdate::New(std::sync::Arc::new(global_model)),
        ))
    }

    // Loads a global model and checks its properties for suitability.
    async fn load_global_model(
        &mut self,
        coordinator_state: &CoordinatorState,
        global_model_id: &str,
    ) -> StateMachineInitializationResult<Model> {
        match self
            .store
            .global_model(&global_model_id)
            .await
            .map_err(StateMachineInitializationError::FetchGlobalModel)?
        {
            Some(global_model) => {
                if Self::model_properties_matches_settings(coordinator_state, &global_model) {
                    Ok(global_model)
                } else {
                    let error_msg = format!(
                        "the length of global model with the id {} does not match with the value of the model length setting {} != {}",
                        &global_model_id,
                        global_model.len(),
                        coordinator_state.round_params.model_length);

                    Err(StateMachineInitializationError::GlobalModelInvalid(
                        error_msg,
                    ))
                }
            }
            None => {
                // the model id exists but we cannot find it in the model store
                // here we better fail because if we restart a coordinator with an empty model
                // the clients will throw away their current global model and start from scratch
                Err(StateMachineInitializationError::GlobalModelUnavailable(
                    format!("cannot find global model {}", &global_model_id),
                ))
            }
        }
    }

    // Checks whether the properties of the loaded global model match the current
    // model settings of the coordinator.
    fn model_properties_matches_settings(
        coordinator_state: &CoordinatorState,
        global_model: &Model,
    ) -> bool {
        coordinator_state.round_params.model_length == global_model.len()
    }
}