Skip to main content

nest_data_source_api/
batch_messages.rs

1use crate::api::{ApiError, PseudonymServiceErrorHandler};
2use libpep::data::json::{EncryptedPEPJSONValue, PEPJSONBuilder, PEPJSONValue};
3use libpep::factors::PseudonymizationDomain;
4use paas_client::pseudonym_service::PseudonymService;
5use paas_client::sessions::EncryptionContexts;
6use serde::{Deserialize, Serialize};
7use serde_json::{json, Value};
8use serde_repr::{Deserialize_repr, Serialize_repr};
9use std::fmt::Debug;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize_repr, Deserialize_repr)]
12#[repr(u16)]
13pub enum StatusCode {
14    Ok = 200,
15    BadRequest = 400,
16    Unauthorized = 401,
17    Forbidden = 403,
18    NotFound = 404,
19    InternalError = 500,
20}
21
22#[derive(Debug, Clone)]
23pub struct Record {
24    pub participant: String,
25    pub extra_data: Value,
26    pub result_code: StatusCode,
27}
28
29impl Record {
30    pub fn ok(participant: String, extra_data: Value) -> Self {
31        Self::with_code(participant, extra_data, StatusCode::Ok)
32    }
33
34    pub fn not_found(participant: String, extra_data: Value) -> Self {
35        Self::with_code(participant, extra_data, StatusCode::NotFound)
36    }
37
38    pub fn forbidden(participant: String, extra_data: Value) -> Self {
39        Self::with_code(participant, extra_data, StatusCode::Forbidden)
40    }
41
42    pub fn server_error(participant: String, extra_data: Value) -> Self {
43        Self::with_code(participant, extra_data, StatusCode::InternalError)
44    }
45
46    pub fn bad_request(participant: String, error_message: &str) -> Self {
47        let error_data = json!({
48            "message": error_message
49        });
50        Self::with_code(participant, error_data, StatusCode::BadRequest)
51    }
52
53    pub fn data_processing_error(participant: String, error_message: &str) -> Self {
54        let error_data = json!({
55            "message": error_message
56        });
57        Self::with_code(participant, error_data, StatusCode::InternalError)
58    }
59
60    pub fn with_code(participant: String, extra_data: Value, code: StatusCode) -> Self {
61        Self {
62            participant,
63            extra_data,
64            result_code: code,
65        }
66    }
67
68    /// Convert to a PEPJSONValue with participants marked as pseudonyms.
69    fn to_pep_json(&self) -> Result<PEPJSONValue, String> {
70        let fields = json!({
71            "participants": self.participant,
72            "extra_data": self.extra_data,
73            "result_code": self.result_code as u16,
74        });
75        Ok(PEPJSONBuilder::from_json(&fields, &["participants"])
76            .ok_or("Failed to build PEPJSONValue".to_string())?
77            .build())
78    }
79
80    /// Try to create from a decrypted PEPJSONValue.
81    fn from_pep_json(pep_value: &PEPJSONValue) -> Result<Self, String> {
82        let json_value = pep_value.to_value().map_err(|e| e.to_string())?;
83
84        let participant = json_value
85            .get("participants")
86            .ok_or("PEPJSONValue missing 'participants' field".to_string())?
87            .as_str()
88            .ok_or("'participants' field is not a string".to_string())?
89            .to_string();
90
91        let extra_data = json_value.get("extra_data").cloned().unwrap_or(Value::Null);
92
93        let result_code = json_value
94            .get("result_code")
95            .and_then(|v| v.as_u64())
96            .map(|code| match code {
97                200 => StatusCode::Ok,
98                400 => StatusCode::BadRequest,
99                401 => StatusCode::Unauthorized,
100                403 => StatusCode::Forbidden,
101                404 => StatusCode::NotFound,
102                _ => StatusCode::InternalError,
103            })
104            .unwrap_or(StatusCode::Ok);
105
106        Ok(Self {
107            participant,
108            extra_data,
109            result_code,
110        })
111    }
112}
113
114#[derive(Debug, Serialize, Deserialize)]
115pub struct PEPActivityBatchRequest {
116    pub record: Vec<EncryptedPEPJSONValue>,
117    pub activity: String,
118    pub sessions: EncryptionContexts,
119    pub domain: PseudonymizationDomain,
120    pub domain_to: PseudonymizationDomain,
121}
122
123#[derive(Debug)]
124pub struct ActivityBatchRequest {
125    pub record: Vec<Record>,
126    pub activity: String,
127    pub domain: PseudonymizationDomain,
128}
129
130#[derive(Debug)]
131pub struct ActivityBatchResponse {
132    pub record: Vec<Record>,
133    pub activity: String,
134    pub domain: PseudonymizationDomain,
135}
136
137#[derive(Debug, Serialize, Deserialize)]
138pub struct PEPActivityBatchResponse {
139    pub record: Vec<EncryptedPEPJSONValue>,
140    pub activity: String,
141    pub sessions: EncryptionContexts,
142    pub domain: PseudonymizationDomain,
143    pub domain_to: PseudonymizationDomain,
144}
145
146#[allow(dead_code)]
147pub trait HasBatchInfo<T> {
148    fn participants(&self) -> Vec<String>;
149    fn extra_data(&self) -> Vec<Value>;
150    fn record(&self) -> Vec<T>;
151    fn domain(&self) -> PseudonymizationDomain;
152}
153
154#[allow(dead_code)]
155pub trait HasPEPBatchInfo<T> {
156    fn encrypted_data(&self) -> Vec<EncryptedPEPJSONValue>;
157    fn record(&self) -> Vec<T>;
158
159    fn domain(&self) -> PseudonymizationDomain;
160    fn sessions(&self) -> EncryptionContexts;
161    fn domain_to(&self) -> PseudonymizationDomain;
162}
163
164pub trait PEPBatchMessageType<PepT, T>: Debug {
165    type PEPBatchMessage: HasPEPBatchInfo<PepT> + Serialize + Debug;
166    type BatchMessage: HasBatchInfo<T> + Debug;
167
168    fn pack(
169        request: Self::BatchMessage,
170        domain_to: PseudonymizationDomain,
171        ps: &mut PseudonymService,
172    ) -> Result<Self::PEPBatchMessage, ApiError>;
173
174    #[allow(async_fn_in_trait)]
175    async fn unpack(
176        request: Self::PEPBatchMessage,
177        ps: &mut PseudonymService,
178    ) -> Result<Self::BatchMessage, ApiError>;
179}
180
181impl HasBatchInfo<Record> for ActivityBatchRequest {
182    fn participants(&self) -> Vec<String> {
183        self.record.iter().map(|r| r.participant.clone()).collect()
184    }
185
186    fn extra_data(&self) -> Vec<Value> {
187        self.record.iter().map(|r| r.extra_data.clone()).collect()
188    }
189
190    fn record(&self) -> Vec<Record> {
191        self.record.clone()
192    }
193
194    fn domain(&self) -> PseudonymizationDomain {
195        self.domain.clone()
196    }
197}
198
199impl HasBatchInfo<Record> for ActivityBatchResponse {
200    fn participants(&self) -> Vec<String> {
201        self.record.iter().map(|r| r.participant.clone()).collect()
202    }
203
204    fn extra_data(&self) -> Vec<Value> {
205        self.record.iter().map(|r| r.extra_data.clone()).collect()
206    }
207
208    fn record(&self) -> Vec<Record> {
209        self.record.clone()
210    }
211
212    fn domain(&self) -> PseudonymizationDomain {
213        self.domain.clone()
214    }
215}
216
217impl HasPEPBatchInfo<EncryptedPEPJSONValue> for PEPActivityBatchRequest {
218    fn encrypted_data(&self) -> Vec<EncryptedPEPJSONValue> {
219        self.record.clone()
220    }
221
222    fn record(&self) -> Vec<EncryptedPEPJSONValue> {
223        self.record.clone()
224    }
225
226    fn domain(&self) -> PseudonymizationDomain {
227        self.domain.clone()
228    }
229
230    fn sessions(&self) -> EncryptionContexts {
231        self.sessions.clone()
232    }
233
234    fn domain_to(&self) -> PseudonymizationDomain {
235        self.domain_to.clone()
236    }
237}
238
239impl HasPEPBatchInfo<EncryptedPEPJSONValue> for PEPActivityBatchResponse {
240    fn encrypted_data(&self) -> Vec<EncryptedPEPJSONValue> {
241        self.record.clone()
242    }
243
244    fn record(&self) -> Vec<EncryptedPEPJSONValue> {
245        self.record.clone()
246    }
247
248    fn domain(&self) -> PseudonymizationDomain {
249        self.domain.clone()
250    }
251
252    fn sessions(&self) -> EncryptionContexts {
253        self.sessions.clone()
254    }
255
256    fn domain_to(&self) -> PseudonymizationDomain {
257        self.domain_to.clone()
258    }
259}
260
261fn encrypt_records(
262    records: &[Record],
263    ps: &mut PseudonymService,
264) -> Result<(Vec<EncryptedPEPJSONValue>, EncryptionContexts), ApiError> {
265    if records.is_empty() {
266        return Err(ApiError::Internal("No data points to encrypt".to_string()));
267    }
268
269    let mut rng = rand::rng();
270
271    let json_records: Vec<PEPJSONValue> = records
272        .iter()
273        .map(|r| r.to_pep_json())
274        .collect::<Result<Vec<_>, _>>()
275        .map_err(ApiError::Internal)?;
276
277    ps.encrypt_batch(&json_records, &mut rng)
278        .handle_pseudonym_error()
279}
280
281async fn decrypt_records(
282    pep_message: &impl HasPEPBatchInfo<EncryptedPEPJSONValue>,
283    ps: &mut PseudonymService,
284) -> Result<Vec<Record>, ApiError> {
285    let transcrypted = ps
286        .transcrypt_batch(
287            pep_message.record(),
288            &pep_message.sessions(),
289            &pep_message.domain(),
290            &pep_message.domain_to(),
291        )
292        .await
293        .handle_pseudonym_error()?;
294
295    let decrypted = ps.decrypt_batch(&transcrypted).handle_pseudonym_error()?;
296
297    decrypted
298        .iter()
299        .map(|pep_value| Record::from_pep_json(pep_value).map_err(ApiError::Internal))
300        .collect::<Result<Vec<_>, _>>()
301}
302
303#[derive(Debug)]
304pub struct ActivityBatchRequestMessageType;
305
306impl PEPBatchMessageType<EncryptedPEPJSONValue, Record> for ActivityBatchRequestMessageType {
307    type PEPBatchMessage = PEPActivityBatchRequest;
308    type BatchMessage = ActivityBatchRequest;
309
310    fn pack(
311        request: Self::BatchMessage,
312        domain_to: PseudonymizationDomain,
313        ps: &mut PseudonymService,
314    ) -> Result<Self::PEPBatchMessage, ApiError> {
315        let (record, sessions) = encrypt_records(&request.record, ps)?;
316        Ok(Self::PEPBatchMessage {
317            record,
318            activity: request.activity,
319            domain: request.domain,
320            domain_to,
321            sessions,
322        })
323    }
324
325    async fn unpack(
326        request: Self::PEPBatchMessage,
327        ps: &mut PseudonymService,
328    ) -> Result<Self::BatchMessage, ApiError> {
329        let domain = request.domain_to.clone();
330        let activity = request.activity.clone();
331        let record = decrypt_records(&request, ps).await?;
332        Ok(Self::BatchMessage {
333            record,
334            domain,
335            activity,
336        })
337    }
338}
339
340#[derive(Debug)]
341pub struct ActivityBatchResponseMessageType;
342
343impl PEPBatchMessageType<EncryptedPEPJSONValue, Record> for ActivityBatchResponseMessageType {
344    type PEPBatchMessage = PEPActivityBatchResponse;
345    type BatchMessage = ActivityBatchResponse;
346
347    fn pack(
348        request: Self::BatchMessage,
349        domain_to: PseudonymizationDomain,
350        ps: &mut PseudonymService,
351    ) -> Result<Self::PEPBatchMessage, ApiError> {
352        let (record, sessions) = encrypt_records(&request.record, ps)?;
353        Ok(Self::PEPBatchMessage {
354            record,
355            activity: request.activity,
356            domain: request.domain,
357            domain_to,
358            sessions,
359        })
360    }
361
362    async fn unpack(
363        request: Self::PEPBatchMessage,
364        ps: &mut PseudonymService,
365    ) -> Result<Self::BatchMessage, ApiError> {
366        let domain = request.domain_to.clone();
367        let activity = request.activity.clone();
368        let record = decrypt_records(&request, ps).await?;
369        Ok(Self::BatchMessage {
370            record,
371            domain,
372            activity,
373        })
374    }
375}