nest_data_source_api/
batch_messages.rs

1use crate::api::{ApiError, PseudonymServiceErrorHandler};
2use libpep::high_level::contexts::PseudonymizationDomain;
3use libpep::high_level::data_types::{
4    DataPoint, Encryptable, EncryptedDataPoint, EncryptedPseudonym, Pseudonym,
5};
6use libpep::high_level::ops::EncryptedEntityData;
7use paas_client::pseudonym_service::PseudonymService;
8use paas_client::sessions::EncryptionContexts;
9use rand::{CryptoRng, RngCore};
10use serde::{Deserialize, Serialize};
11use serde_json::{from_str, Value};
12use std::fmt::Debug;
13
14#[derive(Debug, Serialize, Deserialize, Clone)]
15pub struct PEPPseudoDataPoint {
16    pub participant: Vec<EncryptedPseudonym>,
17    pub extra_data: Vec<EncryptedDataPoint>,
18}
19
20#[derive(Debug, Clone)]
21pub struct PseudoDataPoint {
22    pub participant: Vec<Pseudonym>,
23    pub extra_data: Value,
24}
25
26#[derive(Debug, Serialize, Deserialize)]
27pub struct PEPParticipantBatchRequest {
28    pub pseudo_data_points: Vec<PEPPseudoDataPoint>,
29    pub sessions: EncryptionContexts,
30    pub domain: PseudonymizationDomain,
31    pub domain_to: PseudonymizationDomain, // We need to know in which domain to create the participant. It may often be 1-1 with the data source, but not always. It may also be 1-1 with the column, but not always.
32}
33
34#[derive(Debug)]
35pub struct ParticipantBatchRequest {
36    pub pseudo_data_points: Vec<PseudoDataPoint>,
37    pub domain: PseudonymizationDomain,
38}
39
40#[derive(Debug)]
41pub struct ParticipantBatchResponse {
42    pub pseudo_data_points: Vec<PseudoDataPoint>,
43    pub domain: PseudonymizationDomain,
44}
45
46#[derive(Debug, Serialize, Deserialize)]
47pub struct PEPParticipantBatchResponse {
48    pub pseudo_data_points: Vec<PEPPseudoDataPoint>,
49    pub sessions: EncryptionContexts,
50    pub domain: PseudonymizationDomain,
51    pub domain_to: PseudonymizationDomain,
52}
53
54// Activity Batch Message Structs
55
56#[derive(Debug, Serialize, Deserialize)]
57pub struct PEPActivityBatchRequest {
58    pub pseudo_data_points: Vec<PEPPseudoDataPoint>,
59    pub activity: String,
60    pub sessions: EncryptionContexts,
61    pub domain: PseudonymizationDomain,
62    pub domain_to: PseudonymizationDomain,
63}
64
65#[derive(Debug)]
66pub struct ActivityBatchRequest {
67    pub pseudo_data_points: Vec<PseudoDataPoint>,
68    pub activity: String,
69    pub domain: PseudonymizationDomain,
70}
71
72#[derive(Debug)]
73pub struct ActivityBatchResponse {
74    pub pseudo_data_points: Vec<PseudoDataPoint>,
75    pub activity: String,
76    pub domain: PseudonymizationDomain,
77}
78
79#[derive(Debug, Serialize, Deserialize)]
80pub struct PEPActivityBatchResponse {
81    pub pseudo_data_points: Vec<PEPPseudoDataPoint>,
82    pub activity: String,
83    pub sessions: EncryptionContexts,
84    pub domain: PseudonymizationDomain,
85    pub domain_to: PseudonymizationDomain,
86}
87
88#[allow(dead_code)]
89pub trait HasBatchInfo<T> {
90    fn participants(&self) -> Vec<Vec<Pseudonym>>;
91    fn extra_data(&self) -> Vec<Value>;
92    fn pseudo_data_points(&self) -> Vec<T>;
93    fn domain(&self) -> PseudonymizationDomain;
94}
95
96// HasPEPBatchInfo already has domain_to, but might need access to encrypted participants
97#[allow(dead_code)]
98pub trait HasPEPBatchInfo<T> {
99    fn participants(&self) -> Vec<Vec<EncryptedPseudonym>>;
100    fn extra_data(&self) -> Vec<Vec<EncryptedDataPoint>>;
101    fn pseudo_data_points(&self) -> Vec<T>;
102
103    fn domain(&self) -> PseudonymizationDomain;
104    fn sessions(&self) -> EncryptionContexts;
105    fn domain_to(&self) -> PseudonymizationDomain;
106}
107
108impl HasBatchInfo<PseudoDataPoint> for ParticipantBatchRequest {
109    fn participants(&self) -> Vec<Vec<Pseudonym>> {
110        self.pseudo_data_points
111            .iter()
112            .map(|r| r.participant.clone())
113            .collect()
114    }
115
116    fn extra_data(&self) -> Vec<Value> {
117        self.pseudo_data_points
118            .iter()
119            .map(|r| r.extra_data.clone())
120            .collect()
121    }
122
123    fn pseudo_data_points(&self) -> Vec<PseudoDataPoint> {
124        self.pseudo_data_points.clone()
125    }
126
127    fn domain(&self) -> PseudonymizationDomain {
128        self.domain.clone()
129    }
130}
131
132impl HasBatchInfo<PseudoDataPoint> for ParticipantBatchResponse {
133    fn participants(&self) -> Vec<Vec<Pseudonym>> {
134        self.pseudo_data_points
135            .iter()
136            .map(|r| r.participant.clone())
137            .collect()
138    }
139
140    fn extra_data(&self) -> Vec<Value> {
141        self.pseudo_data_points
142            .iter()
143            .map(|r| r.extra_data.clone())
144            .collect()
145    }
146
147    fn pseudo_data_points(&self) -> Vec<PseudoDataPoint> {
148        self.pseudo_data_points.clone()
149    }
150
151    fn domain(&self) -> PseudonymizationDomain {
152        self.domain.clone()
153    }
154}
155
156impl HasPEPBatchInfo<PEPPseudoDataPoint> for PEPParticipantBatchRequest {
157    fn participants(&self) -> Vec<Vec<EncryptedPseudonym>> {
158        self.pseudo_data_points
159            .iter()
160            .map(|r| r.participant.clone())
161            .collect()
162    }
163
164    fn extra_data(&self) -> Vec<Vec<EncryptedDataPoint>> {
165        self.pseudo_data_points
166            .iter()
167            .map(|r| r.extra_data.clone())
168            .collect()
169    }
170
171    fn pseudo_data_points(&self) -> Vec<PEPPseudoDataPoint> {
172        self.pseudo_data_points.clone()
173    }
174    fn domain(&self) -> PseudonymizationDomain {
175        self.domain.clone()
176    }
177
178    fn sessions(&self) -> EncryptionContexts {
179        self.sessions.clone()
180    }
181
182    fn domain_to(&self) -> PseudonymizationDomain {
183        self.domain_to.clone()
184    }
185}
186
187impl HasPEPBatchInfo<PEPPseudoDataPoint> for PEPParticipantBatchResponse {
188    fn participants(&self) -> Vec<Vec<EncryptedPseudonym>> {
189        self.pseudo_data_points
190            .iter()
191            .map(|r| r.participant.clone())
192            .collect()
193    }
194
195    fn extra_data(&self) -> Vec<Vec<EncryptedDataPoint>> {
196        self.pseudo_data_points
197            .iter()
198            .map(|r| r.extra_data.clone())
199            .collect()
200    }
201
202    fn pseudo_data_points(&self) -> Vec<PEPPseudoDataPoint> {
203        self.pseudo_data_points.clone()
204    }
205
206    fn domain(&self) -> PseudonymizationDomain {
207        self.domain.clone()
208    }
209
210    fn sessions(&self) -> EncryptionContexts {
211        self.sessions.clone()
212    }
213
214    fn domain_to(&self) -> PseudonymizationDomain {
215        self.domain_to.clone()
216    }
217}
218
219pub trait PEPBatchMessageType<PepT, T>: Debug {
220    type PEPBatchMessage: HasPEPBatchInfo<PepT> + Serialize + Debug;
221    type BatchMessage: HasBatchInfo<T> + Debug;
222    #[allow(async_fn_in_trait)]
223    async fn pack<R: RngCore + CryptoRng>(
224        request: Self::BatchMessage,
225        domain_to: PseudonymizationDomain,
226        ps: &mut PseudonymService,
227        rng: &mut R,
228    ) -> Result<Self::PEPBatchMessage, ApiError>;
229
230    #[allow(async_fn_in_trait)]
231    async fn unpack(
232        request: Self::PEPBatchMessage,
233        ps: &mut PseudonymService,
234    ) -> Result<Self::BatchMessage, ApiError>;
235}
236
237#[derive(Debug)]
238pub struct ParticipantBatchRequestMessageType;
239
240impl PEPBatchMessageType<PEPPseudoDataPoint, PseudoDataPoint>
241    for ParticipantBatchRequestMessageType
242{
243    type PEPBatchMessage = PEPParticipantBatchRequest;
244    type BatchMessage = ParticipantBatchRequest;
245    async fn pack<R: RngCore + CryptoRng>(
246        request: Self::BatchMessage,
247        domain_to: PseudonymizationDomain,
248        ps: &mut PseudonymService,
249        rng: &mut R,
250    ) -> Result<Self::PEPBatchMessage, ApiError> {
251        let mut pseudo_data_points = vec![];
252        let mut enc_contexts = vec![];
253
254        for pseudo_data_point in request.pseudo_data_points {
255            let mut encrypted_pseudonyms = vec![];
256            for pseudonym in pseudo_data_point.participant {
257                let (enc_pseudonym, enc_context) =
258                    ps.encrypt(&pseudonym, rng).await.handle_pseudonym_error()?;
259                encrypted_pseudonyms.push(enc_pseudonym);
260                enc_contexts.push(enc_context);
261            }
262
263            let mut encrypted_extra_data = vec![];
264            let datapoints =
265                DataPoint::from_string_padded(pseudo_data_point.extra_data.to_string().as_str());
266
267            for datapoint in datapoints {
268                let (encrypted_data_point, enc_context) =
269                    ps.encrypt(&datapoint, rng).await.handle_pseudonym_error()?;
270                encrypted_extra_data.push(encrypted_data_point);
271                enc_contexts.push(enc_context);
272            }
273
274            pseudo_data_points.push(PEPPseudoDataPoint {
275                participant: encrypted_pseudonyms,
276                extra_data: encrypted_extra_data,
277            })
278        }
279
280        let sessions = enc_contexts
281            .first()
282            .ok_or(ApiError::Internal("No encryption contexts".to_string()))?;
283
284        if !enc_contexts
285            .iter()
286            .all(|enc_context| enc_context == sessions)
287        {
288            return Err(ApiError::Internal(
289                "Encryption contexts do not match".to_string(),
290            ));
291        }
292
293        Ok(Self::PEPBatchMessage {
294            pseudo_data_points,
295            domain: request.domain,
296            domain_to,
297            sessions: sessions.clone(),
298        })
299    }
300    async fn unpack(
301        request: Self::PEPBatchMessage,
302        ps: &mut PseudonymService,
303    ) -> Result<Self::BatchMessage, ApiError> {
304        let encryptables: Vec<EncryptedEntityData> = request
305            .pseudo_data_points
306            .iter()
307            .map(|r| (r.participant.clone(), r.extra_data.clone()))
308            .collect();
309
310        let transcrypted = ps
311            .transcrypt(
312                &encryptables,
313                &request.sessions,
314                &request.domain,
315                &request.domain_to,
316            )
317            .await
318            .handle_pseudonym_error()?;
319
320        let mut pseudo_data_points_result = vec![];
321
322        for (pseudonyms, data_points) in transcrypted {
323            let mut decrypted_pseudonyms = vec![];
324            for pseu in pseudonyms {
325                let decrypted = ps.decrypt(&pseu).await.handle_pseudonym_error()?;
326                decrypted_pseudonyms.push(decrypted);
327            }
328
329            let mut decrypted_data_points = vec![];
330            for datap in data_points {
331                let decrypted = ps.decrypt(&datap).await.handle_pseudonym_error()?;
332                decrypted_data_points.push(decrypted);
333            }
334            let data_string =
335                DataPoint::to_string_padded(&decrypted_data_points).map_err(|_| {
336                    ApiError::Internal("Failed to convert data points to string".to_string())
337                })?;
338
339            let data: Value = from_str(&data_string)
340                .map_err(|e| ApiError::Internal(format!("Failed to parse extra data: {}", e)))?;
341
342            pseudo_data_points_result.push(PseudoDataPoint {
343                participant: decrypted_pseudonyms,
344                extra_data: data,
345            })
346        }
347
348        Ok(Self::BatchMessage {
349            pseudo_data_points: pseudo_data_points_result,
350            domain: request.domain_to,
351        })
352    }
353}
354
355#[derive(Debug)]
356pub struct ParticipantBatchResponseMessageType;
357
358impl PEPBatchMessageType<PEPPseudoDataPoint, PseudoDataPoint>
359    for ParticipantBatchResponseMessageType
360{
361    type PEPBatchMessage = PEPParticipantBatchResponse;
362    type BatchMessage = ParticipantBatchResponse;
363
364    async fn pack<R: RngCore + CryptoRng>(
365        request: Self::BatchMessage,
366        domain_to: PseudonymizationDomain,
367        ps: &mut PseudonymService,
368        rng: &mut R,
369    ) -> Result<Self::PEPBatchMessage, ApiError> {
370        let mut pseudo_data_points = vec![];
371        let mut enc_contexts = vec![];
372
373        for pseudo_data_point in request.pseudo_data_points {
374            let mut encrypted_pseudonyms = vec![];
375            for pseudonym in pseudo_data_point.participant {
376                let (enc_pseudonym, enc_context) =
377                    ps.encrypt(&pseudonym, rng).await.handle_pseudonym_error()?;
378                encrypted_pseudonyms.push(enc_pseudonym);
379                enc_contexts.push(enc_context);
380            }
381
382            let mut encrypted_extra_data = vec![];
383            let datapoints =
384                DataPoint::from_string_padded(pseudo_data_point.extra_data.to_string().as_str());
385
386            for datapoint in datapoints {
387                let (encrypted_data_point, enc_context) =
388                    ps.encrypt(&datapoint, rng).await.handle_pseudonym_error()?;
389                encrypted_extra_data.push(encrypted_data_point);
390                enc_contexts.push(enc_context);
391            }
392
393            pseudo_data_points.push(PEPPseudoDataPoint {
394                participant: encrypted_pseudonyms,
395                extra_data: encrypted_extra_data,
396            })
397        }
398
399        let sessions = enc_contexts
400            .first()
401            .ok_or(ApiError::Internal("No encryption contexts".to_string()))?;
402
403        if !enc_contexts
404            .iter()
405            .all(|enc_context| enc_context == sessions)
406        {
407            return Err(ApiError::Internal(
408                "Encryption contexts do not match".to_string(),
409            ));
410        }
411
412        Ok(Self::PEPBatchMessage {
413            pseudo_data_points,
414            domain: request.domain,
415            domain_to,
416            sessions: sessions.clone(),
417        })
418    }
419    async fn unpack(
420        request: Self::PEPBatchMessage,
421        ps: &mut PseudonymService,
422    ) -> Result<Self::BatchMessage, ApiError> {
423        let encryptables: Vec<EncryptedEntityData> = request
424            .pseudo_data_points
425            .iter()
426            .map(|r| (r.participant.clone(), r.extra_data.clone()))
427            .collect();
428
429        let transcrypted = ps
430            .transcrypt(
431                &encryptables,
432                &request.sessions,
433                &request.domain,
434                &request.domain_to,
435            )
436            .await
437            .handle_pseudonym_error()?;
438
439        let mut pseudo_data_points_result = vec![];
440
441        for (pseudonyms, data_points) in transcrypted {
442            let mut decrypted_pseudonyms = vec![];
443            for pseu in pseudonyms {
444                let decrypted = ps.decrypt(&pseu).await.handle_pseudonym_error()?;
445                decrypted_pseudonyms.push(decrypted);
446            }
447
448            let mut decrypted_data_points = vec![];
449            for datap in data_points {
450                let decrypted = ps.decrypt(&datap).await.handle_pseudonym_error()?;
451                decrypted_data_points.push(decrypted);
452            }
453            let data_string =
454                DataPoint::to_string_padded(&decrypted_data_points).map_err(|_| {
455                    ApiError::Internal("Failed to convert data points to string".to_string())
456                })?;
457
458            let data: Value = from_str(&data_string)
459                .map_err(|e| ApiError::Internal(format!("Failed to parse extra data: {}", e)))?;
460
461            pseudo_data_points_result.push(PseudoDataPoint {
462                participant: decrypted_pseudonyms,
463                extra_data: data,
464            })
465        }
466
467        Ok(Self::BatchMessage {
468            pseudo_data_points: pseudo_data_points_result,
469            domain: request.domain_to,
470        })
471    }
472}
473
474// HasBatchInfo implementations for Activity structs
475impl HasBatchInfo<PseudoDataPoint> for ActivityBatchRequest {
476    fn participants(&self) -> Vec<Vec<Pseudonym>> {
477        self.pseudo_data_points
478            .iter()
479            .map(|r| r.participant.clone())
480            .collect()
481    }
482
483    fn extra_data(&self) -> Vec<Value> {
484        self.pseudo_data_points
485            .iter()
486            .map(|r| r.extra_data.clone())
487            .collect()
488    }
489
490    fn pseudo_data_points(&self) -> Vec<PseudoDataPoint> {
491        self.pseudo_data_points.clone()
492    }
493
494    fn domain(&self) -> PseudonymizationDomain {
495        self.domain.clone()
496    }
497}
498
499impl HasBatchInfo<PseudoDataPoint> for ActivityBatchResponse {
500    fn participants(&self) -> Vec<Vec<Pseudonym>> {
501        self.pseudo_data_points
502            .iter()
503            .map(|r| r.participant.clone())
504            .collect()
505    }
506
507    fn extra_data(&self) -> Vec<Value> {
508        self.pseudo_data_points
509            .iter()
510            .map(|r| r.extra_data.clone())
511            .collect()
512    }
513
514    fn pseudo_data_points(&self) -> Vec<PseudoDataPoint> {
515        self.pseudo_data_points.clone()
516    }
517
518    fn domain(&self) -> PseudonymizationDomain {
519        self.domain.clone()
520    }
521}
522
523// HasPEPBatchInfo implementations for Activity structs
524impl HasPEPBatchInfo<PEPPseudoDataPoint> for PEPActivityBatchRequest {
525    fn participants(&self) -> Vec<Vec<EncryptedPseudonym>> {
526        self.pseudo_data_points
527            .iter()
528            .map(|r| r.participant.clone())
529            .collect()
530    }
531
532    fn extra_data(&self) -> Vec<Vec<EncryptedDataPoint>> {
533        self.pseudo_data_points
534            .iter()
535            .map(|r| r.extra_data.clone())
536            .collect()
537    }
538
539    fn pseudo_data_points(&self) -> Vec<PEPPseudoDataPoint> {
540        self.pseudo_data_points.clone()
541    }
542
543    fn domain(&self) -> PseudonymizationDomain {
544        self.domain.clone()
545    }
546
547    fn sessions(&self) -> EncryptionContexts {
548        self.sessions.clone()
549    }
550
551    fn domain_to(&self) -> PseudonymizationDomain {
552        self.domain_to.clone()
553    }
554}
555
556impl HasPEPBatchInfo<PEPPseudoDataPoint> for PEPActivityBatchResponse {
557    fn participants(&self) -> Vec<Vec<EncryptedPseudonym>> {
558        self.pseudo_data_points
559            .iter()
560            .map(|r| r.participant.clone())
561            .collect()
562    }
563
564    fn extra_data(&self) -> Vec<Vec<EncryptedDataPoint>> {
565        self.pseudo_data_points
566            .iter()
567            .map(|r| r.extra_data.clone())
568            .collect()
569    }
570
571    fn pseudo_data_points(&self) -> Vec<PEPPseudoDataPoint> {
572        self.pseudo_data_points.clone()
573    }
574
575    fn domain(&self) -> PseudonymizationDomain {
576        self.domain.clone()
577    }
578
579    fn sessions(&self) -> EncryptionContexts {
580        self.sessions.clone()
581    }
582
583    fn domain_to(&self) -> PseudonymizationDomain {
584        self.domain_to.clone()
585    }
586}
587
588// Message Type implementations
589#[derive(Debug)]
590pub struct ActivityBatchRequestMessageType;
591
592impl PEPBatchMessageType<PEPPseudoDataPoint, PseudoDataPoint> for ActivityBatchRequestMessageType {
593    type PEPBatchMessage = PEPActivityBatchRequest;
594    type BatchMessage = ActivityBatchRequest;
595
596    async fn pack<R: RngCore + CryptoRng>(
597        request: Self::BatchMessage,
598        domain_to: PseudonymizationDomain,
599        ps: &mut PseudonymService,
600        rng: &mut R,
601    ) -> Result<Self::PEPBatchMessage, ApiError> {
602        let mut pseudo_data_points = vec![];
603        let mut enc_contexts = vec![];
604
605        for pseudo_data_point in request.pseudo_data_points {
606            let mut encrypted_pseudonyms = vec![];
607            for pseudonym in pseudo_data_point.participant {
608                let (enc_pseudonym, enc_context) =
609                    ps.encrypt(&pseudonym, rng).await.handle_pseudonym_error()?;
610                encrypted_pseudonyms.push(enc_pseudonym);
611                enc_contexts.push(enc_context);
612            }
613
614            let mut encrypted_extra_data = vec![];
615            let datapoints =
616                DataPoint::from_string_padded(pseudo_data_point.extra_data.to_string().as_str());
617
618            for datapoint in datapoints {
619                let (encrypted_data_point, enc_context) =
620                    ps.encrypt(&datapoint, rng).await.handle_pseudonym_error()?;
621                encrypted_extra_data.push(encrypted_data_point);
622                enc_contexts.push(enc_context);
623            }
624
625            pseudo_data_points.push(PEPPseudoDataPoint {
626                participant: encrypted_pseudonyms,
627                extra_data: encrypted_extra_data,
628            });
629        }
630
631        let sessions = enc_contexts
632            .first()
633            .ok_or(ApiError::Internal("No encryption contexts".to_string()))?;
634
635        if !enc_contexts
636            .iter()
637            .all(|enc_context| enc_context == sessions)
638        {
639            return Err(ApiError::Internal(
640                "Encryption contexts do not match".to_string(),
641            ));
642        }
643
644        Ok(Self::PEPBatchMessage {
645            pseudo_data_points,
646            activity: request.activity,
647            domain: request.domain,
648            domain_to,
649            sessions: sessions.clone(),
650        })
651    }
652
653    async fn unpack(
654        request: Self::PEPBatchMessage,
655        ps: &mut PseudonymService,
656    ) -> Result<Self::BatchMessage, ApiError> {
657        let encryptables: Vec<EncryptedEntityData> = request
658            .pseudo_data_points
659            .iter()
660            .map(|r| (r.participant.clone(), r.extra_data.clone()))
661            .collect();
662
663        let transcrypted = ps
664            .transcrypt(
665                &encryptables,
666                &request.sessions,
667                &request.domain,
668                &request.domain_to,
669            )
670            .await
671            .handle_pseudonym_error()?;
672
673        let mut pseudo_data_points_result = vec![];
674
675        for (pseudonyms, data_points) in transcrypted {
676            let mut decrypted_pseudonyms = vec![];
677            for pseu in pseudonyms {
678                let decrypted = ps.decrypt(&pseu).await.handle_pseudonym_error()?;
679                decrypted_pseudonyms.push(decrypted);
680            }
681
682            let mut decrypted_data_points = vec![];
683            for datap in data_points {
684                let decrypted = ps.decrypt(&datap).await.handle_pseudonym_error()?;
685                decrypted_data_points.push(decrypted);
686            }
687            let data_string =
688                DataPoint::to_string_padded(&decrypted_data_points).map_err(|_| {
689                    ApiError::Internal("Failed to convert data points to string".to_string())
690                })?;
691
692            let data: Value = from_str(&data_string)
693                .map_err(|e| ApiError::Internal(format!("Failed to parse extra data: {}", e)))?;
694
695            pseudo_data_points_result.push(PseudoDataPoint {
696                participant: decrypted_pseudonyms,
697                extra_data: data,
698            })
699        }
700
701        Ok(Self::BatchMessage {
702            pseudo_data_points: pseudo_data_points_result,
703            domain: request.domain_to,
704            activity: request.activity,
705        })
706    }
707}
708
709#[derive(Debug)]
710pub struct ActivityBatchResponseMessageType;
711
712impl PEPBatchMessageType<PEPPseudoDataPoint, PseudoDataPoint> for ActivityBatchResponseMessageType {
713    type PEPBatchMessage = PEPActivityBatchResponse;
714    type BatchMessage = ActivityBatchResponse;
715
716    async fn pack<R: RngCore + CryptoRng>(
717        request: Self::BatchMessage,
718        domain_to: PseudonymizationDomain,
719        ps: &mut PseudonymService,
720        rng: &mut R,
721    ) -> Result<Self::PEPBatchMessage, ApiError> {
722        let mut pseudo_data_points = vec![];
723        let mut enc_contexts = vec![];
724
725        for pseudo_data_point in request.pseudo_data_points {
726            let mut encrypted_pseudonyms = vec![];
727            for pseudonym in pseudo_data_point.participant {
728                let (enc_pseudonym, enc_context) =
729                    ps.encrypt(&pseudonym, rng).await.handle_pseudonym_error()?;
730                encrypted_pseudonyms.push(enc_pseudonym);
731                enc_contexts.push(enc_context);
732            }
733
734            let mut encrypted_extra_data = vec![];
735            let datapoints =
736                DataPoint::from_string_padded(pseudo_data_point.extra_data.to_string().as_str());
737            for datapoint in datapoints {
738                let (encrypted_data_point, enc_context) =
739                    ps.encrypt(&datapoint, rng).await.handle_pseudonym_error()?;
740                encrypted_extra_data.push(encrypted_data_point);
741                enc_contexts.push(enc_context);
742            }
743
744            pseudo_data_points.push(PEPPseudoDataPoint {
745                participant: encrypted_pseudonyms,
746                extra_data: encrypted_extra_data,
747            });
748        }
749
750        let sessions = enc_contexts
751            .first()
752            .ok_or(ApiError::Internal("No encryption contexts".to_string()))?;
753        if !enc_contexts
754            .iter()
755            .all(|enc_context| enc_context == sessions)
756        {
757            return Err(ApiError::Internal(
758                "Encryption contexts do not match".to_string(),
759            ));
760        }
761
762        Ok(Self::PEPBatchMessage {
763            pseudo_data_points,
764            activity: request.activity,
765            domain: request.domain,
766            domain_to,
767            sessions: sessions.clone(),
768        })
769    }
770
771    async fn unpack(
772        request: Self::PEPBatchMessage,
773        ps: &mut PseudonymService,
774    ) -> Result<Self::BatchMessage, ApiError> {
775        let encryptables: Vec<EncryptedEntityData> = request
776            .pseudo_data_points
777            .iter()
778            .map(|r| (r.participant.clone(), r.extra_data.clone()))
779            .collect();
780
781        let transcrypted = ps
782            .transcrypt(
783                &encryptables,
784                &request.sessions,
785                &request.domain,
786                &request.domain_to,
787            )
788            .await
789            .handle_pseudonym_error()?;
790
791        let mut pseudo_data_points_result = vec![];
792
793        for (pseudonyms, data_points) in transcrypted {
794            let mut decrypted_pseudonyms = vec![];
795            for pseu in pseudonyms {
796                let decrypted = ps.decrypt(&pseu).await.handle_pseudonym_error()?;
797                decrypted_pseudonyms.push(decrypted);
798            }
799
800            let mut decrypted_data_points = vec![];
801            for datap in data_points {
802                let decrypted = ps.decrypt(&datap).await.handle_pseudonym_error()?;
803                decrypted_data_points.push(decrypted);
804            }
805            let data_string =
806                DataPoint::to_string_padded(&decrypted_data_points).map_err(|_| {
807                    ApiError::Internal("Failed to convert data points to string".to_string())
808                })?;
809
810            let data: Value = from_str(&data_string)
811                .map_err(|e| ApiError::Internal(format!("Failed to parse extra data: {}", e)))?;
812
813            pseudo_data_points_result.push(PseudoDataPoint {
814                participant: decrypted_pseudonyms,
815                extra_data: data,
816            })
817        }
818
819        Ok(Self::BatchMessage {
820            pseudo_data_points: pseudo_data_points_result,
821            domain: request.domain_to,
822            activity: request.activity,
823        })
824    }
825}