1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
//! Storage API.

use async_trait::async_trait;
use derive_more::Deref;
use displaydoc::Display;
use num_enum::TryFromPrimitive;
use thiserror::Error;

use crate::state_machine::coordinator::CoordinatorState;
use xaynet_core::{
    common::RoundSeed,
    crypto::ByteObject,
    mask::{MaskObject, Model},
    LocalSeedDict,
    SeedDict,
    SumDict,
    SumParticipantEphemeralPublicKey,
    SumParticipantPublicKey,
    UpdateParticipantPublicKey,
};

/// The error type for storage operations that are not directly related to application domain.
/// These include, for example IO errors like broken pipe, file not found, out-of-memory, etc.
pub type StorageError = anyhow::Error;

/// The result of the storage operation.
pub type StorageResult<T> = Result<T, StorageError>;

#[async_trait]
/// An abstract coordinator storage.
pub trait CoordinatorStorage
where
    Self: Clone + Send + Sync + 'static,
{
    /// Sets a [`CoordinatorState`].
    ///
    /// # Behavior
    ///
    /// - If no state has been set yet, set the state and return `StorageResult::Ok(())`.
    /// - If a state already exists, override the state and return `StorageResult::Ok(())`.
    async fn set_coordinator_state(&mut self, state: &CoordinatorState) -> StorageResult<()>;

    /// Returns a [`CoordinatorState`].
    ///
    /// # Behavior
    ///
    /// - If no state has been set yet, return `StorageResult::Ok(Option::None)`.
    /// - If a state exists, return `StorageResult::Ok(Some(CoordinatorState))`.
    async fn coordinator_state(&mut self) -> StorageResult<Option<CoordinatorState>>;

    /// Adds a sum participant entry to the [`SumDict`].
    ///
    /// # Behavior
    ///
    /// - If a sum participant has been successfully added, return `StorageResult::Ok(SumPartAdd)`
    ///   containing a `Result::Ok(())`.
    /// - If the participant could not be added due to a PET protocol error, return
    ///   the corresponding `StorageResult::Ok(SumPartAdd)` containing a
    ///   `Result::Err(SumPartAddError)`.
    async fn add_sum_participant(
        &mut self,
        pk: &SumParticipantPublicKey,
        ephm_pk: &SumParticipantEphemeralPublicKey,
    ) -> StorageResult<SumPartAdd>;

    /// Returns the [`SumDict`].
    ///
    /// # Behavior
    ///
    /// - If the sum dict does not exist, return `StorageResult::Ok(Option::None)`.
    /// - If the sum dict exists, return `StorageResult::Ok(Option::Some(SumDict))`.
    async fn sum_dict(&mut self) -> StorageResult<Option<SumDict>>;

    /// Adds a local [`LocalSeedDict`] of the given [`UpdateParticipantPublicKey`] to the [`SeedDict`].
    ///
    /// # Behavior
    ///
    /// - If the local seed dict has been successfully added, return
    ///   `StorageResult::Ok(LocalSeedDictAdd)` containing a `Result::Ok(())`.
    /// - If the local seed dict could not be added due to a PET protocol error, return
    ///   the corresponding `StorageResult::Ok(LocalSeedDictAdd)` containing a
    ///   `Result::Err(LocalSeedDictAddError)`.
    async fn add_local_seed_dict(
        &mut self,
        update_pk: &UpdateParticipantPublicKey,
        local_seed_dict: &LocalSeedDict,
    ) -> StorageResult<LocalSeedDictAdd>;

    /// Returns the [`SeedDict`].
    ///
    /// # Behavior
    ///
    /// - If the seed dict does not exist, return `StorageResult::Ok(Option::None)`.
    /// - If the seed dict exists, return `StorageResult::Ok(Option::Some(SeedDict))`.
    async fn seed_dict(&mut self) -> StorageResult<Option<SeedDict>>;

    /// Increments the mask score with the given [`MaskObject`]b by one.
    ///
    /// # Behavior
    ///
    /// - If the mask score has been successfully incremented, return
    ///   `StorageResult::Ok(MaskScoreIncr)` containing a `Result::Ok(())`.
    /// - If the mask score could not be incremented due to a PET protocol error,
    ///   return the corresponding `Result::Ok(MaskScoreIncr)` containing a
    ///   `Result::Err(MaskScoreIncrError)`.
    async fn incr_mask_score(
        &mut self,
        pk: &SumParticipantPublicKey,
        mask: &MaskObject,
    ) -> StorageResult<MaskScoreIncr>;

    /// Returns the two masks with the highest score.
    ///
    /// # Behavior
    ///
    /// - If no masks exist, return `Result::Ok(Option::None)`.
    /// - If only one mask exists, return this mask
    ///   `StorageResult::Ok(Option::Some(Vec<(MaskObject, u64)>))`.
    /// - If two masks exist with the same score, return both
    ///   `StorageResult::Ok(Option::Some(Vec<(MaskObject, u64)>))`.
    /// - If two masks exist with the different score, return
    ///   both in descending order `StorageResult::Ok(Option::Some(Vec<(MaskObject, u64)>))`.
    async fn best_masks(&mut self) -> StorageResult<Option<Vec<(MaskObject, u64)>>>;

    /// Returns the number of unique masks.
    async fn number_of_unique_masks(&mut self) -> StorageResult<u64>;

    /// Deletes all coordinator data. This includes the coordinator
    /// state as well as the [`SumDict`], [`SeedDict`] and `mask` dictionary.
    async fn delete_coordinator_data(&mut self) -> StorageResult<()>;

    /// Deletes the [`SumDict`], [`SeedDict`] and `mask` dictionary.
    async fn delete_dicts(&mut self) -> StorageResult<()>;

    /// Sets the latest global model id.
    ///
    /// # Behavior
    ///
    /// - If no global model id has been set yet, set the new id and return `StorageResult::Ok(())`.
    /// - If the global model id already exists, override with the new id and
    ///   return `StorageResult::Ok(())`.
    async fn set_latest_global_model_id(&mut self, id: &str) -> StorageResult<()>;

    /// Returns the latest global model id.
    ///
    /// # Behavior
    ///
    /// - If the global model id does not exist, return `StorageResult::Ok(None)`.
    /// - If the global model id exists, return `StorageResult::Ok(Some(String)))`.
    async fn latest_global_model_id(&mut self) -> StorageResult<Option<String>>;

    /// Checks if the [`CoordinatorStorage`] is ready to process requests.
    ///
    /// # Behavior
    ///
    /// If the [`CoordinatorStorage`] is ready to process requests, return `StorageResult::Ok(())`.
    /// If the [`CoordinatorStorage`] cannot process requests because of a connection error,
    /// for example, return `StorageResult::Err(error)`.
    async fn is_ready(&mut self) -> StorageResult<()>;
}

#[async_trait]
/// An abstract model storage.
pub trait ModelStorage
where
    Self: Clone + Send + Sync + 'static,
{
    /// Sets a global model.
    ///
    /// # Behavior
    ///
    /// - If the global model already exists (has the same model id), return
    ///   `StorageResult::Err(StorageError))`.
    /// - If the global model does not exist, set the model and return `StorageResult::Ok(String)`
    async fn set_global_model(
        &mut self,
        round_id: u64,
        round_seed: &RoundSeed,
        global_model: &Model,
    ) -> StorageResult<String>;

    /// Returns a global model.
    ///
    /// # Behavior
    ///
    /// - If the global model does not exist, return `StorageResult::Ok(Option::None)`.
    /// - If the global model exists, return `StorageResult::Ok(Option::Some(Model))`.
    async fn global_model(&mut self, id: &str) -> StorageResult<Option<Model>>;

    /// Creates a unique global model id by using the round id and the round seed in which
    /// the global model was created.
    ///
    /// The format of the default implementation is `roundid_roundseed`,
    /// where the [`RoundSeed`] is encoded in hexadecimal.
    fn create_global_model_id(round_id: u64, round_seed: &RoundSeed) -> String {
        let round_seed = hex::encode(round_seed.as_slice());
        format!("{}_{}", round_id, round_seed)
    }

    /// Checks if the [`ModelStorage`] is ready to process requests.
    ///
    /// # Behavior
    ///
    /// If the [`ModelStorage`] is ready to process requests, return `StorageResult::Ok(())`.
    /// If the [`ModelStorage`] cannot process requests because of a connection error,
    /// for example, return `StorageResult::Err(error)`.
    async fn is_ready(&mut self) -> StorageResult<()>;
}

/// A wrapper that contains the result of the "add sum participant" operation.
#[derive(Deref)]
pub struct SumPartAdd(pub(crate) Result<(), SumPartAddError>);

impl SumPartAdd {
    /// Unwraps this wrapper, returning the underlying result.
    pub fn into_inner(self) -> Result<(), SumPartAddError> {
        self.0
    }
}

/// Error that can occur when adding a sum participant to the [`SumDict`].
#[derive(Display, Error, Debug, TryFromPrimitive)]
#[repr(i64)]
pub enum SumPartAddError {
    /// sum participant already exists
    AlreadyExists = 0,
}

/// A wrapper that contains the result of the "add local seed dict" operation.
#[derive(Deref)]
pub struct LocalSeedDictAdd(pub(crate) Result<(), LocalSeedDictAddError>);

impl LocalSeedDictAdd {
    /// Unwraps this wrapper, returning the underlying result.
    pub fn into_inner(self) -> Result<(), LocalSeedDictAddError> {
        self.0
    }
}

/// Error that can occur when adding a local seed dict to the [`SeedDict`].
#[derive(Display, Error, Debug, TryFromPrimitive)]
#[repr(i64)]
pub enum LocalSeedDictAddError {
    /// the length of the local seed dict and the length of sum dict are not equal
    LengthMisMatch = -1,
    /// local dict contains an unknown sum participant
    UnknownSumParticipant = -2,
    /// update participant already submitted an update
    UpdatePkAlreadySubmitted = -3,
    /// update participant already exists in the inner update seed dict
    UpdatePkAlreadyExistsInUpdateSeedDict = -4,
}

/// A wrapper that contains the result of the "increment mask score" operation.
#[derive(Deref)]
pub struct MaskScoreIncr(pub(crate) Result<(), MaskScoreIncrError>);

impl MaskScoreIncr {
    /// Unwraps this wrapper, returning the underlying result.
    pub fn into_inner(self) -> Result<(), MaskScoreIncrError> {
        self.0
    }
}

/// Error that can occur when incrementing a mask score.
#[derive(Display, Error, Debug, TryFromPrimitive)]
#[repr(i64)]
pub enum MaskScoreIncrError {
    /// unknown sum participant
    UnknownSumPk = -1,
    /// sum participant submitted a mask already
    MaskAlreadySubmitted = -2,
}