Skip to main content

nest_data_source_api/
batch_messages.rs

1use crate::api::{ApiError, PseudonymServiceErrorHandler};
2use libpep::data::json::{EncryptedPEPJSONValue, PEPJSONBuilder, PEPJSONValue};
3use libpep::data::simple::{ElGamalEncryptable, Pseudonym};
4use libpep::factors::PseudonymizationDomain;
5use paas_client::pseudonym_service::PseudonymService;
6use paas_client::sessions::EncryptionContexts;
7use serde::{Deserialize, Serialize};
8use serde_json::{json, Value};
9use serde_repr::{Deserialize_repr, Serialize_repr};
10use std::fmt::Debug;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize_repr, Deserialize_repr)]
13#[repr(u16)]
14pub enum StatusCode {
15    Ok = 200,
16    BadRequest = 400,
17    Unauthorized = 401,
18    Forbidden = 403,
19    NotFound = 404,
20    InternalError = 500,
21}
22
23#[derive(Debug, Clone)]
24pub struct Record {
25    pub participant: Pseudonym,
26    pub extra_data: Value,
27    pub result_code: StatusCode,
28}
29
30impl Record {
31    pub fn ok(participant: Pseudonym, extra_data: Value) -> Self {
32        Self::with_code(participant, extra_data, StatusCode::Ok)
33    }
34
35    pub fn not_found(participant: Pseudonym, extra_data: Value) -> Self {
36        Self::with_code(participant, extra_data, StatusCode::NotFound)
37    }
38
39    pub fn forbidden(participant: Pseudonym, extra_data: Value) -> Self {
40        Self::with_code(participant, extra_data, StatusCode::Forbidden)
41    }
42
43    pub fn server_error(participant: Pseudonym, extra_data: Value) -> Self {
44        Self::with_code(participant, extra_data, StatusCode::InternalError)
45    }
46
47    pub fn bad_request(participant: Pseudonym, error_message: &str) -> Self {
48        let error_data = json!({
49            "message": error_message
50        });
51        Self::with_code(participant, error_data, StatusCode::BadRequest)
52    }
53
54    pub fn data_processing_error(participant: Pseudonym, error_message: &str) -> Self {
55        let error_data = json!({
56            "message": error_message
57        });
58        Self::with_code(participant, error_data, StatusCode::InternalError)
59    }
60
61    pub fn with_code(participant: Pseudonym, extra_data: Value, code: StatusCode) -> Self {
62        Self {
63            participant,
64            extra_data,
65            result_code: code,
66        }
67    }
68
69    /// Convert to a PEPJSONValue with participants marked as pseudonyms.
70    fn to_pep_json(&self) -> Result<PEPJSONValue, String> {
71        let fields = json!({
72            "participants": self.participant,
73            "extra_data": self.extra_data,
74            "result_code": self.result_code as u16,
75        });
76        Ok(PEPJSONBuilder::from_json(&fields, &["participants"])
77            .ok_or("Failed to build PEPJSONValue".to_string())?
78            .build())
79    }
80
81    /// Try to create from a decrypted PEPJSONValue.
82    fn from_pep_json(pep_value: &PEPJSONValue) -> Result<Self, String> {
83        let json_value = pep_value.to_value().map_err(|e| e.to_string())?;
84
85        let participant = Pseudonym::from_hex(
86            json_value
87                .get("participants")
88                .ok_or("Missing participants field".to_string())?
89                .as_str()
90                .ok_or("Participant field is not a string".to_string())?,
91        )
92        .ok_or("Invalid pseudonym hex".to_string())?;
93
94        let extra_data = json_value.get("extra_data").cloned().unwrap_or(Value::Null);
95
96        let result_code = json_value
97            .get("result_code")
98            .and_then(|v| v.as_u64())
99            .map(|code| match code {
100                200 => StatusCode::Ok,
101                400 => StatusCode::BadRequest,
102                401 => StatusCode::Unauthorized,
103                403 => StatusCode::Forbidden,
104                404 => StatusCode::NotFound,
105                _ => StatusCode::InternalError,
106            })
107            .unwrap_or(StatusCode::Ok);
108
109        Ok(Self {
110            participant,
111            extra_data,
112            result_code,
113        })
114    }
115}
116
117#[derive(Debug, Serialize, Deserialize)]
118pub struct PEPActivityBatchRequest {
119    pub record: Vec<EncryptedPEPJSONValue>,
120    pub activity: String,
121    pub sessions: EncryptionContexts,
122    pub domain: PseudonymizationDomain,
123    pub domain_to: PseudonymizationDomain,
124}
125
126#[derive(Debug)]
127pub struct ActivityBatchRequest {
128    pub record: Vec<Record>,
129    pub activity: String,
130    pub domain: PseudonymizationDomain,
131}
132
133#[derive(Debug)]
134pub struct ActivityBatchResponse {
135    pub record: Vec<Record>,
136    pub activity: String,
137    pub domain: PseudonymizationDomain,
138}
139
140#[derive(Debug, Serialize, Deserialize)]
141pub struct PEPActivityBatchResponse {
142    pub record: Vec<EncryptedPEPJSONValue>,
143    pub activity: String,
144    pub sessions: EncryptionContexts,
145    pub domain: PseudonymizationDomain,
146    pub domain_to: PseudonymizationDomain,
147}
148
149#[allow(dead_code)]
150pub trait HasBatchInfo<T> {
151    fn participants(&self) -> Vec<Pseudonym>;
152    fn extra_data(&self) -> Vec<Value>;
153    fn record(&self) -> Vec<T>;
154    fn domain(&self) -> PseudonymizationDomain;
155}
156
157#[allow(dead_code)]
158pub trait HasPEPBatchInfo<T> {
159    fn encrypted_data(&self) -> Vec<EncryptedPEPJSONValue>;
160    fn record(&self) -> Vec<T>;
161
162    fn domain(&self) -> PseudonymizationDomain;
163    fn sessions(&self) -> EncryptionContexts;
164    fn domain_to(&self) -> PseudonymizationDomain;
165}
166
167pub trait PEPBatchMessageType<PepT, T>: Debug {
168    type PEPBatchMessage: HasPEPBatchInfo<PepT> + Serialize + Debug;
169    type BatchMessage: HasBatchInfo<T> + Debug;
170
171    fn pack(
172        request: Self::BatchMessage,
173        domain_to: PseudonymizationDomain,
174        ps: &mut PseudonymService,
175    ) -> Result<Self::PEPBatchMessage, ApiError>;
176
177    #[allow(async_fn_in_trait)]
178    async fn unpack(
179        request: Self::PEPBatchMessage,
180        ps: &mut PseudonymService,
181    ) -> Result<Self::BatchMessage, ApiError>;
182}
183
184impl HasBatchInfo<Record> for ActivityBatchRequest {
185    fn participants(&self) -> Vec<Pseudonym> {
186        self.record.iter().map(|r| r.participant).collect()
187    }
188
189    fn extra_data(&self) -> Vec<Value> {
190        self.record.iter().map(|r| r.extra_data.clone()).collect()
191    }
192
193    fn record(&self) -> Vec<Record> {
194        self.record.clone()
195    }
196
197    fn domain(&self) -> PseudonymizationDomain {
198        self.domain.clone()
199    }
200}
201
202impl HasBatchInfo<Record> for ActivityBatchResponse {
203    fn participants(&self) -> Vec<Pseudonym> {
204        self.record.iter().map(|r| r.participant).collect()
205    }
206
207    fn extra_data(&self) -> Vec<Value> {
208        self.record.iter().map(|r| r.extra_data.clone()).collect()
209    }
210
211    fn record(&self) -> Vec<Record> {
212        self.record.clone()
213    }
214
215    fn domain(&self) -> PseudonymizationDomain {
216        self.domain.clone()
217    }
218}
219
220impl HasPEPBatchInfo<EncryptedPEPJSONValue> for PEPActivityBatchRequest {
221    fn encrypted_data(&self) -> Vec<EncryptedPEPJSONValue> {
222        self.record.clone()
223    }
224
225    fn record(&self) -> Vec<EncryptedPEPJSONValue> {
226        self.record.clone()
227    }
228
229    fn domain(&self) -> PseudonymizationDomain {
230        self.domain.clone()
231    }
232
233    fn sessions(&self) -> EncryptionContexts {
234        self.sessions.clone()
235    }
236
237    fn domain_to(&self) -> PseudonymizationDomain {
238        self.domain_to.clone()
239    }
240}
241
242impl HasPEPBatchInfo<EncryptedPEPJSONValue> for PEPActivityBatchResponse {
243    fn encrypted_data(&self) -> Vec<EncryptedPEPJSONValue> {
244        self.record.clone()
245    }
246
247    fn record(&self) -> Vec<EncryptedPEPJSONValue> {
248        self.record.clone()
249    }
250
251    fn domain(&self) -> PseudonymizationDomain {
252        self.domain.clone()
253    }
254
255    fn sessions(&self) -> EncryptionContexts {
256        self.sessions.clone()
257    }
258
259    fn domain_to(&self) -> PseudonymizationDomain {
260        self.domain_to.clone()
261    }
262}
263
264fn encrypt_records(
265    records: &[Record],
266    ps: &mut PseudonymService,
267) -> Result<(Vec<EncryptedPEPJSONValue>, EncryptionContexts), ApiError> {
268    if records.is_empty() {
269        return Err(ApiError::Internal("No data points to encrypt".to_string()));
270    }
271
272    let mut rng = rand::rng();
273
274    let json_records: Vec<PEPJSONValue> = records
275        .iter()
276        .map(|r| r.to_pep_json())
277        .collect::<Result<Vec<_>, _>>()
278        .map_err(ApiError::Internal)?;
279
280    ps.encrypt_batch(&json_records, &mut rng)
281        .handle_pseudonym_error()
282}
283
284async fn decrypt_records(
285    pep_message: &impl HasPEPBatchInfo<EncryptedPEPJSONValue>,
286    ps: &mut PseudonymService,
287) -> Result<Vec<Record>, ApiError> {
288    let transcrypted = ps
289        .transcrypt_batch(
290            pep_message.record(),
291            &pep_message.sessions(),
292            &pep_message.domain(),
293            &pep_message.domain_to(),
294        )
295        .await
296        .handle_pseudonym_error()?;
297
298    let decrypted = ps.decrypt_batch(&transcrypted).handle_pseudonym_error()?;
299
300    decrypted
301        .iter()
302        .map(|pep_value| Record::from_pep_json(pep_value).map_err(ApiError::Internal))
303        .collect::<Result<Vec<_>, _>>()
304}
305
306#[derive(Debug)]
307pub struct ActivityBatchRequestMessageType;
308
309impl PEPBatchMessageType<EncryptedPEPJSONValue, Record> for ActivityBatchRequestMessageType {
310    type PEPBatchMessage = PEPActivityBatchRequest;
311    type BatchMessage = ActivityBatchRequest;
312
313    fn pack(
314        request: Self::BatchMessage,
315        domain_to: PseudonymizationDomain,
316        ps: &mut PseudonymService,
317    ) -> Result<Self::PEPBatchMessage, ApiError> {
318        let (record, sessions) = encrypt_records(&request.record, ps)?;
319        Ok(Self::PEPBatchMessage {
320            record,
321            activity: request.activity,
322            domain: request.domain,
323            domain_to,
324            sessions,
325        })
326    }
327
328    async fn unpack(
329        request: Self::PEPBatchMessage,
330        ps: &mut PseudonymService,
331    ) -> Result<Self::BatchMessage, ApiError> {
332        let domain = request.domain_to.clone();
333        let activity = request.activity.clone();
334        let record = decrypt_records(&request, ps).await?;
335        Ok(Self::BatchMessage {
336            record,
337            domain,
338            activity,
339        })
340    }
341}
342
343#[derive(Debug)]
344pub struct ActivityBatchResponseMessageType;
345
346impl PEPBatchMessageType<EncryptedPEPJSONValue, Record> for ActivityBatchResponseMessageType {
347    type PEPBatchMessage = PEPActivityBatchResponse;
348    type BatchMessage = ActivityBatchResponse;
349
350    fn pack(
351        request: Self::BatchMessage,
352        domain_to: PseudonymizationDomain,
353        ps: &mut PseudonymService,
354    ) -> Result<Self::PEPBatchMessage, ApiError> {
355        let (record, sessions) = encrypt_records(&request.record, ps)?;
356        Ok(Self::PEPBatchMessage {
357            record,
358            activity: request.activity,
359            domain: request.domain,
360            domain_to,
361            sessions,
362        })
363    }
364
365    async fn unpack(
366        request: Self::PEPBatchMessage,
367        ps: &mut PseudonymService,
368    ) -> Result<Self::BatchMessage, ApiError> {
369        let domain = request.domain_to.clone();
370        let activity = request.activity.clone();
371        let record = decrypt_records(&request, ps).await?;
372        Ok(Self::BatchMessage {
373            record,
374            domain,
375            activity,
376        })
377    }
378}