nest_data_source_api/
batch_messages.rs

1use crate::api::{ApiError, PseudonymServiceErrorHandler};
2use libpep::high_level::contexts::PseudonymizationDomain;
3use libpep::high_level::data_types::{
4    DataPoint, Encryptable, EncryptedDataPoint, EncryptedPseudonym, Pseudonym,
5};
6use libpep::high_level::ops::EncryptedEntityData;
7use log::error;
8use paas_client::pseudonym_service::PseudonymService;
9use paas_client::sessions::EncryptionContexts;
10use rand::{CryptoRng, RngCore};
11use serde::{Deserialize, Serialize};
12use serde_json::{from_str, json, Value};
13use serde_repr::{Deserialize_repr, Serialize_repr};
14use std::fmt::Debug;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize_repr, Deserialize_repr)]
17#[repr(u16)]
18pub enum StatusCode {
19    Ok = 200,
20    BadRequest = 400,
21    Unauthorized = 401,
22    Forbidden = 403,
23    NotFound = 404,
24    InternalError = 500,
25}
26
27#[derive(Debug, Serialize, Deserialize, Clone)]
28pub struct PEPPseudoDataPoint {
29    pub participant: Vec<EncryptedPseudonym>,
30    pub extra_data: Vec<EncryptedDataPoint>,
31}
32
33#[derive(Debug, Clone)]
34pub struct PseudoDataPoint {
35    pub participant: Vec<Pseudonym>,
36    pub extra_data: Value,
37    pub result_code: StatusCode,
38}
39
40impl PseudoDataPoint {
41    pub fn ok(participant: Vec<Pseudonym>, extra_data: Value) -> Self {
42        Self::with_code(participant, extra_data, StatusCode::Ok)
43    }
44
45    pub fn not_found(participant: Vec<Pseudonym>, extra_data: Value) -> Self {
46        Self::with_code(participant, extra_data, StatusCode::NotFound)
47    }
48
49    pub fn forbidden(participant: Vec<Pseudonym>, extra_data: Value) -> Self {
50        Self::with_code(participant, extra_data, StatusCode::Forbidden)
51    }
52
53    pub fn server_error(participant: Vec<Pseudonym>, extra_data: Value) -> Self {
54        Self::with_code(participant, extra_data, StatusCode::InternalError)
55    }
56
57    // Helper methods for more specific errors
58    pub fn bad_request(participant: Vec<Pseudonym>, error_message: &str) -> Self {
59        let error_data = json!({
60            "message": error_message
61        });
62        Self::with_code(participant, error_data, StatusCode::BadRequest)
63    }
64
65    pub fn data_processing_error(participant: Vec<Pseudonym>, error_message: &str) -> Self {
66        let error_data = json!({
67            "message": error_message
68        });
69        Self::with_code(participant, error_data, StatusCode::InternalError)
70    }
71
72    pub fn with_code(participant: Vec<Pseudonym>, extra_data: Value, code: StatusCode) -> Self {
73        Self {
74            participant,
75            extra_data,
76            result_code: code,
77        }
78    }
79}
80
81#[derive(Debug, Serialize, Deserialize)]
82pub struct PEPActivityBatchRequest {
83    pub pseudo_data_points: Vec<PEPPseudoDataPoint>,
84    pub activity: String,
85    pub sessions: EncryptionContexts,
86    pub domain: PseudonymizationDomain,
87    pub domain_to: PseudonymizationDomain,
88}
89
90#[derive(Debug)]
91pub struct ActivityBatchRequest {
92    pub pseudo_data_points: Vec<PseudoDataPoint>,
93    pub activity: String,
94    pub domain: PseudonymizationDomain,
95}
96
97#[derive(Debug)]
98pub struct ActivityBatchResponse {
99    pub pseudo_data_points: Vec<PseudoDataPoint>,
100    pub activity: String,
101    pub domain: PseudonymizationDomain,
102}
103
104#[derive(Debug, Serialize, Deserialize)]
105pub struct PEPActivityBatchResponse {
106    pub pseudo_data_points: Vec<PEPPseudoDataPoint>,
107    pub activity: String,
108    pub sessions: EncryptionContexts,
109    pub domain: PseudonymizationDomain,
110    pub domain_to: PseudonymizationDomain,
111}
112
113#[allow(dead_code)]
114pub trait HasBatchInfo<T> {
115    fn participants(&self) -> Vec<Vec<Pseudonym>>;
116    fn extra_data(&self) -> Vec<Value>;
117    fn pseudo_data_points(&self) -> Vec<T>;
118    fn domain(&self) -> PseudonymizationDomain;
119}
120
121#[allow(dead_code)]
122pub trait HasPEPBatchInfo<T> {
123    fn participants(&self) -> Vec<Vec<EncryptedPseudonym>>;
124    fn extra_data(&self) -> Vec<Vec<EncryptedDataPoint>>;
125    fn pseudo_data_points(&self) -> Vec<T>;
126
127    fn domain(&self) -> PseudonymizationDomain;
128    fn sessions(&self) -> EncryptionContexts;
129    fn domain_to(&self) -> PseudonymizationDomain;
130}
131
132pub trait PEPBatchMessageType<PepT, T>: Debug {
133    type PEPBatchMessage: HasPEPBatchInfo<PepT> + Serialize + Debug;
134    type BatchMessage: HasBatchInfo<T> + Debug;
135    #[allow(async_fn_in_trait)]
136    async fn pack<R: RngCore + CryptoRng>(
137        request: Self::BatchMessage,
138        domain_to: PseudonymizationDomain,
139        ps: &mut PseudonymService,
140        rng: &mut R,
141    ) -> Result<Self::PEPBatchMessage, ApiError>;
142
143    #[allow(async_fn_in_trait)]
144    async fn unpack(
145        request: Self::PEPBatchMessage,
146        ps: &mut PseudonymService,
147    ) -> Result<Self::BatchMessage, ApiError>;
148}
149
150impl HasBatchInfo<PseudoDataPoint> for ActivityBatchRequest {
151    fn participants(&self) -> Vec<Vec<Pseudonym>> {
152        self.pseudo_data_points
153            .iter()
154            .map(|r| r.participant.clone())
155            .collect()
156    }
157
158    fn extra_data(&self) -> Vec<Value> {
159        self.pseudo_data_points
160            .iter()
161            .map(|r| r.extra_data.clone())
162            .collect()
163    }
164
165    fn pseudo_data_points(&self) -> Vec<PseudoDataPoint> {
166        self.pseudo_data_points.clone()
167    }
168
169    fn domain(&self) -> PseudonymizationDomain {
170        self.domain.clone()
171    }
172}
173
174impl HasBatchInfo<PseudoDataPoint> for ActivityBatchResponse {
175    fn participants(&self) -> Vec<Vec<Pseudonym>> {
176        self.pseudo_data_points
177            .iter()
178            .map(|r| r.participant.clone())
179            .collect()
180    }
181
182    fn extra_data(&self) -> Vec<Value> {
183        self.pseudo_data_points
184            .iter()
185            .map(|r| r.extra_data.clone())
186            .collect()
187    }
188
189    fn pseudo_data_points(&self) -> Vec<PseudoDataPoint> {
190        self.pseudo_data_points.clone()
191    }
192
193    fn domain(&self) -> PseudonymizationDomain {
194        self.domain.clone()
195    }
196}
197
198impl HasPEPBatchInfo<PEPPseudoDataPoint> for PEPActivityBatchRequest {
199    fn participants(&self) -> Vec<Vec<EncryptedPseudonym>> {
200        self.pseudo_data_points
201            .iter()
202            .map(|r| r.participant.clone())
203            .collect()
204    }
205
206    fn extra_data(&self) -> Vec<Vec<EncryptedDataPoint>> {
207        self.pseudo_data_points
208            .iter()
209            .map(|r| r.extra_data.clone())
210            .collect()
211    }
212
213    fn pseudo_data_points(&self) -> Vec<PEPPseudoDataPoint> {
214        self.pseudo_data_points.clone()
215    }
216
217    fn domain(&self) -> PseudonymizationDomain {
218        self.domain.clone()
219    }
220
221    fn sessions(&self) -> EncryptionContexts {
222        self.sessions.clone()
223    }
224
225    fn domain_to(&self) -> PseudonymizationDomain {
226        self.domain_to.clone()
227    }
228}
229
230impl HasPEPBatchInfo<PEPPseudoDataPoint> for PEPActivityBatchResponse {
231    fn participants(&self) -> Vec<Vec<EncryptedPseudonym>> {
232        self.pseudo_data_points
233            .iter()
234            .map(|r| r.participant.clone())
235            .collect()
236    }
237
238    fn extra_data(&self) -> Vec<Vec<EncryptedDataPoint>> {
239        self.pseudo_data_points
240            .iter()
241            .map(|r| r.extra_data.clone())
242            .collect()
243    }
244
245    fn pseudo_data_points(&self) -> Vec<PEPPseudoDataPoint> {
246        self.pseudo_data_points.clone()
247    }
248
249    fn domain(&self) -> PseudonymizationDomain {
250        self.domain.clone()
251    }
252
253    fn sessions(&self) -> EncryptionContexts {
254        self.sessions.clone()
255    }
256
257    fn domain_to(&self) -> PseudonymizationDomain {
258        self.domain_to.clone()
259    }
260}
261
262#[derive(Debug)]
263pub struct ActivityBatchRequestMessageType;
264
265impl PEPBatchMessageType<PEPPseudoDataPoint, PseudoDataPoint> for ActivityBatchRequestMessageType {
266    type PEPBatchMessage = PEPActivityBatchRequest;
267    type BatchMessage = ActivityBatchRequest;
268
269    async fn pack<R: RngCore + CryptoRng>(
270        request: Self::BatchMessage,
271        domain_to: PseudonymizationDomain,
272        ps: &mut PseudonymService,
273        rng: &mut R,
274    ) -> Result<Self::PEPBatchMessage, ApiError> {
275        let mut pseudo_data_points = vec![];
276        let mut enc_contexts = vec![];
277        let mut has_successful_encryptions = false;
278
279        for pseudo_data_point in request.pseudo_data_points {
280            let mut encrypted_pseudonyms = vec![];
281            let mut encryption_failed = false;
282
283            // Encrypt pseudonyms
284            for pseudonym in &pseudo_data_point.participant {
285                match ps.encrypt(pseudonym, rng).await {
286                    Ok((enc_pseudonym, enc_context)) => {
287                        encrypted_pseudonyms.push(enc_pseudonym);
288                        enc_contexts.push(enc_context);
289                    }
290                    Err(e) => {
291                        error!("Failed to encrypt pseudonym: {:?}", e);
292                        encryption_failed = true;
293                        break;
294                    }
295                }
296            }
297
298            if encryption_failed {
299                // For now we just skip this item, we don't really used this method. Since encrypting a request is not yet something we do
300                // What should do here really depends on the context, if we actually use it
301                continue;
302            }
303
304            // Encrypt data points
305            let mut encrypted_extra_data = vec![];
306            let extra_data_w_error_code = Value::Object({
307                let mut map = pseudo_data_point.extra_data.as_object().unwrap().clone();
308                map.insert(
309                    "result_code".to_string(),
310                    json!(pseudo_data_point.result_code),
311                );
312                map
313            });
314            let datapoints =
315                DataPoint::from_string_padded(extra_data_w_error_code.to_string().as_str());
316            let mut data_encryption_failed = false;
317
318            for datapoint in datapoints {
319                match ps.encrypt(&datapoint, rng).await {
320                    Ok((encrypted_data_point, enc_context)) => {
321                        encrypted_extra_data.push(encrypted_data_point);
322                        enc_contexts.push(enc_context);
323                    }
324                    Err(e) => {
325                        error!("Failed to encrypt data point: {:?}", e);
326                        data_encryption_failed = true;
327                        break;
328                    }
329                }
330            }
331
332            if data_encryption_failed {
333                // See comment above
334                continue;
335            }
336
337            has_successful_encryptions = true;
338            pseudo_data_points.push(PEPPseudoDataPoint {
339                participant: encrypted_pseudonyms,
340                extra_data: encrypted_extra_data,
341            });
342        }
343
344        // If we couldn't encrypt any items, fail the entire batch
345        if !has_successful_encryptions {
346            return Err(ApiError::Internal(
347                "Failed to encrypt any data points".to_string(),
348            ));
349        }
350
351        // Get first encryption context for session information
352        let sessions = enc_contexts
353            .first()
354            .ok_or(ApiError::Internal("No encryption contexts".to_string()))?;
355
356        // Validate all encryption contexts match
357        if !enc_contexts
358            .iter()
359            .all(|enc_context| enc_context == sessions)
360        {
361            return Err(ApiError::Internal(
362                "Encryption contexts do not match".to_string(),
363            ));
364        }
365
366        Ok(Self::PEPBatchMessage {
367            pseudo_data_points,
368            activity: request.activity,
369            domain: request.domain,
370            domain_to,
371            sessions: sessions.clone(),
372        })
373    }
374
375    async fn unpack(
376        request: Self::PEPBatchMessage,
377        ps: &mut PseudonymService,
378    ) -> Result<Self::BatchMessage, ApiError> {
379        let encryptables: Vec<EncryptedEntityData> = request
380            .pseudo_data_points
381            .iter()
382            .map(|r| (r.participant.clone(), r.extra_data.clone()))
383            .collect();
384
385        let transcrypted = ps
386            .transcrypt(
387                &encryptables,
388                &request.sessions,
389                &request.domain,
390                &request.domain_to,
391            )
392            .await
393            .handle_pseudonym_error()?;
394
395        let mut pseudo_data_points_result = vec![];
396
397        for (pseudonyms, data_points) in transcrypted {
398            let mut decrypted_pseudonyms = vec![];
399            let mut decryption_failed = false;
400
401            // Attempt to decrypt all pseudonyms
402            for pseu in pseudonyms {
403                match ps.decrypt(&pseu).await {
404                    Ok(decrypted) => decrypted_pseudonyms.push(decrypted),
405                    Err(e) => {
406                        error!("Failed to decrypt pseudonym: {:?}", e);
407                        decryption_failed = true;
408                        break;
409                    }
410                }
411            }
412
413            // If pseudonym decryption failed, create an error response
414            if decryption_failed {
415                pseudo_data_points_result.push(PseudoDataPoint::data_processing_error(
416                    vec![], // Empty pseudonyms since we couldn't decrypt them
417                    "Failed to decrypt pseudonym",
418                ));
419                continue; // Skip to next item
420            }
421
422            // Try to decrypt data points
423            let mut decrypted_data_points = vec![];
424            let mut data_decryption_failed = false;
425
426            for datap in data_points {
427                match ps.decrypt(&datap).await {
428                    Ok(decrypted) => decrypted_data_points.push(decrypted),
429                    Err(e) => {
430                        error!("Failed to decrypt data point: {:?}", e);
431                        data_decryption_failed = true;
432                        break;
433                    }
434                }
435            }
436
437            // If data point decryption failed, create an error response
438            if data_decryption_failed {
439                pseudo_data_points_result.push(PseudoDataPoint::data_processing_error(
440                    decrypted_pseudonyms,
441                    "Failed to decrypt data points",
442                ));
443                continue;
444            }
445
446            // Try to convert data points to string
447            let data_string = match DataPoint::to_string_padded(&decrypted_data_points) {
448                Ok(s) => s,
449                Err(e) => {
450                    error!("Failed to convert data points to string: {:?}", e);
451                    pseudo_data_points_result.push(PseudoDataPoint::data_processing_error(
452                        decrypted_pseudonyms,
453                        "Failed to convert data points to string",
454                    ));
455                    continue;
456                }
457            };
458
459            // Try to parse JSON
460            let data: Value = match from_str(&data_string) {
461                Ok(v) => v,
462                Err(e) => {
463                    error!("Failed to parse JSON: {:?}", e);
464                    pseudo_data_points_result.push(PseudoDataPoint::data_processing_error(
465                        decrypted_pseudonyms,
466                        &format!("Failed to parse data as JSON: {}", e),
467                    ));
468                    continue;
469                }
470            };
471
472            // Successfully processed this item
473            pseudo_data_points_result.push(PseudoDataPoint::ok(decrypted_pseudonyms, data));
474        }
475
476        Ok(Self::BatchMessage {
477            pseudo_data_points: pseudo_data_points_result,
478            domain: request.domain_to,
479            activity: request.activity,
480        })
481    }
482}
483
484#[derive(Debug)]
485pub struct ActivityBatchResponseMessageType;
486
487impl PEPBatchMessageType<PEPPseudoDataPoint, PseudoDataPoint> for ActivityBatchResponseMessageType {
488    type PEPBatchMessage = PEPActivityBatchResponse;
489    type BatchMessage = ActivityBatchResponse;
490
491    async fn pack<R: RngCore + CryptoRng>(
492        request: Self::BatchMessage,
493        domain_to: PseudonymizationDomain,
494        ps: &mut PseudonymService,
495        rng: &mut R,
496    ) -> Result<Self::PEPBatchMessage, ApiError> {
497        let mut pseudo_data_points = vec![];
498        let mut enc_contexts = vec![];
499        let mut has_successful_encryptions = false;
500
501        for pseudo_data_point in request.pseudo_data_points {
502            let mut encrypted_pseudonyms = vec![];
503            let mut encryption_failed = false;
504
505            // Encrypt pseudonyms
506            for pseudonym in &pseudo_data_point.participant {
507                match ps.encrypt(pseudonym, rng).await {
508                    Ok((enc_pseudonym, enc_context)) => {
509                        encrypted_pseudonyms.push(enc_pseudonym);
510                        enc_contexts.push(enc_context);
511                    }
512                    Err(e) => {
513                        error!("Failed to encrypt pseudonym: {:?}", e);
514                        encryption_failed = true;
515                        break;
516                    }
517                }
518            }
519
520            if encryption_failed {
521                // This should really not happen. If it does, we should probably fail the entire batch
522                return Err(ApiError::Internal(
523                    "Failed to encrypt psuedonym. Something was corrupted".to_string(),
524                ));
525            }
526
527            // Encrypt data points
528            let mut encrypted_extra_data = vec![];
529            let extra_data_w_error_code = Value::Object({
530                let mut map = pseudo_data_point.extra_data.as_object().unwrap().clone();
531                map.insert(
532                    "result_code".to_string(),
533                    json!(pseudo_data_point.result_code),
534                );
535                map
536            });
537            let datapoints =
538                DataPoint::from_string_padded(extra_data_w_error_code.to_string().as_str());
539            let mut data_encryption_failed = false;
540
541            for datapoint in datapoints {
542                match ps.encrypt(&datapoint, rng).await {
543                    Ok((encrypted_data_point, enc_context)) => {
544                        encrypted_extra_data.push(encrypted_data_point);
545                        enc_contexts.push(enc_context);
546                    }
547                    Err(e) => {
548                        error!("Failed to encrypt data point: {:?}", e);
549                        data_encryption_failed = true;
550                        break;
551                    }
552                }
553            }
554
555            if data_encryption_failed {
556                // This should really not happen. If it does, we should probably fail the entire batch
557                return Err(ApiError::Internal(
558                    "Failed to encrypt data point".to_string(),
559                ));
560            }
561
562            // Successfully encrypted this item
563            has_successful_encryptions = true;
564            pseudo_data_points.push(PEPPseudoDataPoint {
565                participant: encrypted_pseudonyms,
566                extra_data: encrypted_extra_data,
567            });
568        }
569
570        // If we couldn't encrypt any items, fail the entire batch
571        if !has_successful_encryptions {
572            return Err(ApiError::Internal(
573                "Failed to encrypt any data points".to_string(),
574            ));
575        }
576
577        // Get first encryption context for session information
578        let sessions = enc_contexts
579            .first()
580            .ok_or(ApiError::Internal("No encryption contexts".to_string()))?;
581
582        // Validate all encryption contexts match
583        if !enc_contexts
584            .iter()
585            .all(|enc_context| enc_context == sessions)
586        {
587            return Err(ApiError::Internal(
588                "Encryption contexts do not match".to_string(),
589            ));
590        }
591
592        Ok(Self::PEPBatchMessage {
593            pseudo_data_points,
594            activity: request.activity,
595            domain: request.domain,
596            domain_to,
597            sessions: sessions.clone(),
598        })
599    }
600
601    async fn unpack(
602        request: Self::PEPBatchMessage,
603        ps: &mut PseudonymService,
604    ) -> Result<Self::BatchMessage, ApiError> {
605        let encryptables: Vec<EncryptedEntityData> = request
606            .pseudo_data_points
607            .iter()
608            .map(|r| (r.participant.clone(), r.extra_data.clone()))
609            .collect();
610
611        let transcrypted = ps
612            .transcrypt(
613                &encryptables,
614                &request.sessions,
615                &request.domain,
616                &request.domain_to,
617            )
618            .await
619            .handle_pseudonym_error()?;
620
621        let mut pseudo_data_points_result = vec![];
622
623        for (pseudonyms, data_points) in transcrypted {
624            let mut decrypted_pseudonyms = vec![];
625            let mut decryption_failed = false;
626
627            // Attempt to decrypt all pseudonyms
628            for pseu in pseudonyms {
629                match ps.decrypt(&pseu).await {
630                    Ok(decrypted) => decrypted_pseudonyms.push(decrypted),
631                    Err(e) => {
632                        error!("Failed to decrypt pseudonym: {:?}", e);
633                        decryption_failed = true;
634                        break;
635                    }
636                }
637            }
638
639            // If pseudonym decryption failed, create an error response
640            if decryption_failed {
641                pseudo_data_points_result.push(PseudoDataPoint::data_processing_error(
642                    vec![], // Empty pseudonyms since we couldn't decrypt them
643                    "Failed to decrypt pseudonym",
644                ));
645                continue; // Skip to next item
646            }
647
648            // Try to decrypt data points
649            let mut decrypted_data_points = vec![];
650            let mut data_decryption_failed = false;
651
652            for datap in data_points {
653                match ps.decrypt(&datap).await {
654                    Ok(decrypted) => decrypted_data_points.push(decrypted),
655                    Err(e) => {
656                        error!("Failed to decrypt data point: {:?}", e);
657                        data_decryption_failed = true;
658                        break;
659                    }
660                }
661            }
662
663            // If data point decryption failed, create an error response
664            if data_decryption_failed {
665                pseudo_data_points_result.push(PseudoDataPoint::data_processing_error(
666                    decrypted_pseudonyms,
667                    "Failed to decrypt data points",
668                ));
669                continue;
670            }
671
672            // Try to convert data points to string
673            let data_string = match DataPoint::to_string_padded(&decrypted_data_points) {
674                Ok(s) => s,
675                Err(e) => {
676                    error!("Failed to convert data points to string: {:?}", e);
677                    pseudo_data_points_result.push(PseudoDataPoint::data_processing_error(
678                        decrypted_pseudonyms,
679                        "Failed to convert data points to string",
680                    ));
681                    continue;
682                }
683            };
684
685            // Try to parse JSON
686            let data: Value = match from_str(&data_string) {
687                Ok(v) => v,
688                Err(e) => {
689                    error!("Failed to parse JSON: {:?}", e);
690                    pseudo_data_points_result.push(PseudoDataPoint::data_processing_error(
691                        decrypted_pseudonyms,
692                        &format!("Failed to parse data as JSON: {}", e),
693                    ));
694                    continue;
695                }
696            };
697
698            // Successfully processed this item
699            pseudo_data_points_result.push(PseudoDataPoint::ok(decrypted_pseudonyms, data));
700        }
701
702        Ok(Self::BatchMessage {
703            pseudo_data_points: pseudo_data_points_result,
704            domain: request.domain_to,
705            activity: request.activity,
706        })
707    }
708}