nest_data_source_api/
api.rs

1use crate::api::ApiError::Internal;
2use crate::batch_messages::{
3    ActivityBatchRequestMessageType, ActivityBatchResponseMessageType, HasPEPBatchInfo,
4    PEPBatchMessageType, PEPPseudoDataPoint, ParticipantBatchRequestMessageType,
5    ParticipantBatchResponseMessageType, PseudoDataPoint,
6};
7use crate::data_source::DataSource;
8use crate::messages::{
9    ActivityRequestMessageType, ActivityResponseMessageType, HasPEPParticipantInfo, PEPMessageType,
10    ParticipantRequestMessageType, ParticipantResponseMessageType,
11};
12use crate::pseudonym_service_factory::{create_pseudonym_service, OauthAuth};
13use actix_web::web::ServiceConfig;
14use actix_web::{error::ErrorUnauthorized, middleware::Logger, web, HttpResponse, ResponseError};
15use actix_web_httpauth::middleware::HttpAuthentication;
16use log::error;
17use oauth_token_service::TokenServiceConfig;
18use paas_client::prelude::PAASConfig;
19use paas_client::pseudonym_service::{PseudonymService, PseudonymServiceError};
20use rand::rngs::OsRng;
21use std::fmt::{Display, Formatter};
22use std::future::Future;
23use std::sync::Arc;
24use tokio::sync::Mutex;
25
26#[derive(Debug)]
27pub enum ApiError {
28    NotFound,
29    Internal(String),
30    BadRequest(String),
31}
32
33impl Display for ApiError {
34    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
35        match self {
36            ApiError::NotFound => write!(f, "Not found"),
37            ApiError::Internal(msg) => write!(f, "Internal error: {}", msg),
38            ApiError::BadRequest(msg) => write!(f, "Bad request: {}", msg),
39        }
40    }
41}
42impl ResponseError for ApiError {
43    fn error_response(&self) -> HttpResponse {
44        match self {
45            ApiError::NotFound => HttpResponse::NotFound().json("Not found"),
46            ApiError::Internal(msg) => HttpResponse::InternalServerError().json(msg),
47            ApiError::BadRequest(msg) => HttpResponse::BadRequest().json(msg),
48        }
49    }
50}
51#[derive(Clone)]
52pub struct DataSourceAPI {
53    connector: Arc<dyn DataSource>,
54    pseudonym_service: Arc<Mutex<PseudonymService>>,
55    nest_auth_token: String,
56}
57
58impl DataSourceAPI {
59    /// Create a new instance of the DataSourceAPI.
60    pub async fn init(
61        connector: impl DataSource,
62        nest_auth_token: String,
63        ps_config: PAASConfig,
64        oauth_config: TokenServiceConfig,
65    ) -> Self {
66        let oauth_auth = OauthAuth::new(oauth_config);
67        let pseudonym_service = create_pseudonym_service(oauth_auth, ps_config).await;
68        Self {
69            connector: Arc::new(connector),
70            pseudonym_service,
71            nest_auth_token,
72        }
73    }
74
75    /// Configure an actix-web service with API routes and authentication middleware.
76    pub fn configure_service(&self, cfg: &mut ServiceConfig) {
77        let auth_token = self.nest_auth_token.clone();
78        let connector = self.connector.clone();
79
80        let auth = HttpAuthentication::bearer(move |req, credentials| {
81            let auth_token = auth_token.clone();
82            async move {
83                if credentials.token() == auth_token {
84                    Ok(req)
85                } else {
86                    Err((ErrorUnauthorized("Invalid token"), req))
87                }
88            }
89        });
90
91        cfg.app_data(web::Data::new(connector)).service(
92            web::scope("/api")
93                .app_data(web::Data::new(self.pseudonym_service.clone()))
94                .wrap(auth)
95                .route(
96                    "/participant/create",
97                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
98                        let connector = connector.clone();
99                        async move {
100                            handle_pep::<
101                                ParticipantRequestMessageType,
102                                ParticipantResponseMessageType,
103                                _,
104                                _,
105                            >(
106                                |r| connector.create_local_participant(r), req, ps
107                            )
108                            .await
109                        }
110                    }),
111                )
112                .route(
113                    "/participant/create_batch",
114                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
115                        let connector = connector.clone();
116                        async move {
117                            handle_pep_batch::<
118                                ParticipantBatchRequestMessageType,
119                                ParticipantBatchResponseMessageType,
120                                PEPPseudoDataPoint,
121                                PseudoDataPoint,
122                                _,
123                                _,
124                            >(
125                                |r| connector.create_local_participant_batch(r), req, ps
126                            )
127                            .await
128                        }
129                    }),
130                )
131                .route(
132                    "/participant/update",
133                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
134                        let connector = connector.clone();
135                        async move {
136                            handle_pep::<
137                                ParticipantRequestMessageType,
138                                ParticipantResponseMessageType,
139                                _,
140                                _,
141                            >(
142                                |r| connector.update_local_participant(r), req, ps
143                            )
144                            .await
145                        }
146                    }),
147                )
148                .route(
149                    "/participant/update_batch",
150                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
151                        let connector = connector.clone();
152                        async move {
153                            handle_pep_batch::<
154                                ParticipantBatchRequestMessageType,
155                                ParticipantBatchResponseMessageType,
156                                PEPPseudoDataPoint,
157                                PseudoDataPoint,
158                                _,
159                                _,
160                            >(
161                                |r| connector.update_local_participant_batch(r), req, ps
162                            )
163                            .await
164                        }
165                    }),
166                )
167                .route(
168                    "/participant/delete",
169                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
170                        let connector = connector.clone();
171                        async move {
172                            handle_pep::<
173                                ParticipantRequestMessageType,
174                                ParticipantResponseMessageType,
175                                _,
176                                _,
177                            >(
178                                |r| connector.delete_local_participant(r), req, ps
179                            )
180                            .await
181                        }
182                    }),
183                )
184                .route(
185                    "/participant/delete_batch",
186                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
187                        let connector = connector.clone();
188                        async move {
189                            handle_pep_batch::<
190                                ParticipantBatchRequestMessageType,
191                                ParticipantBatchResponseMessageType,
192                                PEPPseudoDataPoint,
193                                PseudoDataPoint,
194                                _,
195                                _,
196                            >(
197                                |r| connector.delete_local_participant_batch(r), req, ps
198                            )
199                            .await
200                        }
201                    }),
202                )
203                .route(
204                    "/participant/details",
205                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
206                        let connector = connector.clone();
207                        async move {
208                            handle_pep::<
209                                ParticipantRequestMessageType,
210                                ParticipantResponseMessageType,
211                                _,
212                                _,
213                            >(
214                                |r| connector.local_participant_details(r), req, ps
215                            )
216                            .await
217                        }
218                    }),
219                )
220                .route(
221                    "/participant/details_batch",
222                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
223                        let connector = connector.clone();
224                        async move {
225                            handle_pep_batch::<
226                                ParticipantBatchRequestMessageType,
227                                ParticipantBatchResponseMessageType,
228                                PEPPseudoDataPoint,
229                                PseudoDataPoint,
230                                _,
231                                _,
232                            >(
233                                |r| connector.local_participant_details_batch(r), req, ps
234                            )
235                            .await
236                        }
237                    }),
238                )
239                .route(
240                    "/participant/activities/create",
241                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
242                        let connector = connector.clone();
243                        async move {
244                            handle_pep::<
245                                ActivityRequestMessageType,
246                                ActivityResponseMessageType,
247                                _,
248                                _,
249                            >(
250                                |r| connector.create_participant_activity(r), req, ps
251                            )
252                            .await
253                        }
254                    }),
255                )
256                .route(
257                    "/participant/activities/create_batch",
258                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
259                        let connector = connector.clone();
260                        async move {
261                            handle_pep_batch::<
262                                ActivityBatchRequestMessageType,
263                                ActivityBatchResponseMessageType,
264                                PEPPseudoDataPoint,
265                                PseudoDataPoint,
266                                _,
267                                _,
268                            >(
269                                |r| connector.create_participant_activity_batch(r), req, ps
270                            )
271                            .await
272                        }
273                    }),
274                )
275                .route(
276                    "/participant/activities/update",
277                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
278                        let connector = connector.clone();
279                        async move {
280                            handle_pep::<
281                                ActivityRequestMessageType,
282                                ActivityResponseMessageType,
283                                _,
284                                _,
285                            >(
286                                |r| connector.update_participant_activity(r), req, ps
287                            )
288                            .await
289                        }
290                    }),
291                )
292                .route(
293                    "/participant/activities/update_batch",
294                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
295                        let connector = connector.clone();
296                        async move {
297                            handle_pep_batch::<
298                                ActivityBatchRequestMessageType,
299                                ActivityBatchResponseMessageType,
300                                PEPPseudoDataPoint,
301                                PseudoDataPoint,
302                                _,
303                                _,
304                            >(
305                                |r| connector.update_participant_activity_batch(r), req, ps
306                            )
307                            .await
308                        }
309                    }),
310                )
311                .route(
312                    "/participant/activities/delete",
313                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
314                        let connector = connector.clone();
315                        async move {
316                            handle_pep::<
317                                ActivityRequestMessageType,
318                                ActivityResponseMessageType,
319                                _,
320                                _,
321                            >(
322                                |r| connector.delete_participant_activity(r), req, ps
323                            )
324                            .await
325                        }
326                    }),
327                )
328                .route(
329                    "/participant/activities/delete_batch",
330                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
331                        let connector = connector.clone();
332                        async move {
333                            handle_pep_batch::<
334                                ActivityBatchRequestMessageType,
335                                ActivityBatchResponseMessageType,
336                                PEPPseudoDataPoint,
337                                PseudoDataPoint,
338                                _,
339                                _,
340                            >(
341                                |r| connector.delete_participant_activity_batch(r), req, ps
342                            )
343                            .await
344                        }
345                    }),
346                )
347                .route(
348                    "/participant/activities/details",
349                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
350                        let connector = connector.clone();
351                        async move {
352                            handle_pep::<
353                                ActivityRequestMessageType,
354                                ActivityResponseMessageType,
355                                _,
356                                _,
357                            >(
358                                |r| connector.participant_activity_details(r), req, ps
359                            )
360                            .await
361                        }
362                    }),
363                )
364                .route(
365                    "/participant/activities/details_batch",
366                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
367                        let connector = connector.clone();
368                        async move {
369                            handle_pep_batch::<
370                                ActivityBatchRequestMessageType,
371                                ActivityBatchResponseMessageType,
372                                PEPPseudoDataPoint,
373                                PseudoDataPoint,
374                                _,
375                                _,
376                            >(
377                                |r| connector.participant_activity_details_batch(r), req, ps
378                            )
379                            .await
380                        }
381                    }),
382                )
383                .route(
384                    "/participant/activities/progress",
385                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
386                        let connector = connector.clone();
387                        async move {
388                            handle_pep::<
389                                ActivityRequestMessageType,
390                                ActivityResponseMessageType,
391                                _,
392                                _,
393                            >(
394                                |r| connector.participant_activity_progress(r), req, ps
395                            )
396                            .await
397                        }
398                    }),
399                )
400                .route(
401                    "/participant/activities/progress_batch",
402                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
403                        let connector = connector.clone();
404                        async move {
405                            handle_pep_batch::<
406                                ActivityBatchRequestMessageType,
407                                ActivityBatchResponseMessageType,
408                                PEPPseudoDataPoint,
409                                PseudoDataPoint,
410                                _,
411                                _,
412                            >(
413                                |r| connector.participant_activity_progress_batch(r),
414                                req,
415                                ps,
416                            )
417                            .await
418                        }
419                    }),
420                )
421                .route(
422                    "/participant/activities/result",
423                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
424                        let connector = connector.clone();
425                        async move {
426                            handle_pep::<
427                                ActivityRequestMessageType,
428                                ActivityResponseMessageType,
429                                _,
430                                _,
431                            >(
432                                |r| connector.participant_activity_result(r), req, ps
433                            )
434                            .await
435                        }
436                    }), // TODO: maybe we should require different authentication for this route, since we only want end-users to access it
437                )
438                .route(
439                    "/participant/activities/result_batch",
440                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
441                        let connector = connector.clone();
442                        async move {
443                            handle_pep_batch::<
444                                ActivityBatchRequestMessageType,
445                                ActivityBatchResponseMessageType,
446                                PEPPseudoDataPoint,
447                                PseudoDataPoint,
448                                _,
449                                _,
450                            >(
451                                |r| connector.participant_activity_result_batch(r), req, ps
452                            )
453                            .await
454                        }
455                    }),
456                ),
457        );
458        // .route("/participants/create", web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
459        //     let connector = connector.clone();
460        //     async move { handle(|r| connector.create_local_participants(r), req, ps).await }
461        // }))
462        // .route("/participants/update", web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
463        //     let connector = connector.clone();
464        //     async move { handle(|r| connector.update_participants(r), req, ps).await }
465        // }))
466        // .route("/participants/delete", web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
467        //     let connector = connector.clone();
468        //     async move { handle(|r| connector.delete_participants(r), req, ps).await }
469        // }))
470        // .route("/participants/details", web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
471        //     let connector = connector.clone();
472        //     async move { handle(|r| connector.delete_participants(r), req, ps).await }
473        // }))
474        // .route("/participants/activities/create", web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
475        //     let connector = connector.clone();
476        //     async move { handle(|r| connector.create_participants_activity(r), req, ps).await }
477        // }))
478        // .route("/participants/activities/update", web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
479        //     let connector = connector.clone();
480        //     async move { handle(|r| connector.update_participants_activity(r), req, ps).await }
481        // }))
482        // .route("/participants/activities/delete", web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
483        //     let connector = connector.clone();
484        //     async move { handle(|r| connector.delete_participants_activity(r), req, ps).await }
485        // }))
486        // .route("/participants/activities/details", web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
487        //     let connector = connector.clone();
488        //     async move { handle(|r| connector.participants_activity_details(r), req, ps).await }
489        // }))
490        // .route("/participants/activities/progress", web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
491        //     let connector = connector.clone();
492        //     async move { handle(|r| connector.participants_activity_progress(r), req, ps).await }
493        // }))
494        // .route("/participants/activities/result", web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
495        //     let connector = connector.clone();
496        //     async move { handle(|r| connector.participants_activity_result(r), req, ps).await }
497        // }))
498    }
499
500    /// Run as a standalone server (if needed)
501    pub async fn run_standalone(self, host: &str, port: u16) -> std::io::Result<()> {
502        use actix_web::{App, HttpServer};
503
504        HttpServer::new(move || {
505            App::new()
506                .wrap(Logger::default())
507                .configure(|cfg| self.configure_service(cfg))
508        })
509        .bind((host, port))?
510        .run()
511        .await
512    }
513}
514
515async fn handle_pep<Req, Res, F, Fut>(
516    handle: F,
517    request: web::Json<Req::PEPMessage>,
518    pseudonym_service: web::Data<Arc<Mutex<PseudonymService>>>,
519) -> Result<HttpResponse, ApiError>
520where
521    Req: PEPMessageType,            // Request message type
522    Res: PEPMessageType,            // Response message type
523    F: FnOnce(Req::Message) -> Fut, // Async handler type
524    Fut: Future<Output = Result<Option<Res::Message>, ApiError>>, // Handler output
525{
526    let rng = &mut OsRng;
527    let pep_request = request.into_inner();
528    let domain_to = pep_request.domain_to().clone();
529
530    let mut ps = pseudonym_service.lock().await;
531
532    let request = Req::unpack(pep_request, &mut ps).await?;
533    let response: Option<Res::Message> = handle(request).await?;
534
535    if let Some(response) = response {
536        let pep_response = Res::pack(response, domain_to, &mut ps, rng).await?;
537        Ok(HttpResponse::Ok().json(pep_response))
538    } else {
539        Ok(HttpResponse::Ok().finish())
540    }
541}
542
543async fn handle_pep_batch<Req, Res, PepT, T, F, Fut>(
544    handle: F,
545    request: web::Json<Req::PEPBatchMessage>,
546    pseudonym_service: web::Data<Arc<Mutex<PseudonymService>>>,
547) -> Result<HttpResponse, ApiError>
548where
549    Req: PEPBatchMessageType<PepT, T>,   // Request message type
550    Res: PEPBatchMessageType<PepT, T>,   // Response message type
551    F: FnOnce(Req::BatchMessage) -> Fut, // Async handler type
552    Fut: Future<Output = Result<Option<Res::BatchMessage>, ApiError>>, // Handler output
553{
554    let rng = &mut OsRng;
555    let pep_request = request.into_inner();
556
557    let domain_to = pep_request.domain_to().clone();
558
559    let mut ps = pseudonym_service.lock().await;
560
561    let request = Req::unpack(pep_request, &mut ps).await?;
562    let response: Option<Res::BatchMessage> = handle(request).await?;
563
564    if let Some(response) = response {
565        let pep_response = Res::pack(response, domain_to, &mut ps, rng).await?;
566        Ok(HttpResponse::Ok().json(pep_response))
567    } else {
568        Ok(HttpResponse::Ok().finish())
569    }
570}
571
572pub(crate) trait PseudonymServiceErrorHandler<T> {
573    fn handle_pseudonym_error(self) -> Result<T, ApiError>;
574}
575
576impl<T> PseudonymServiceErrorHandler<T> for Result<T, PseudonymServiceError> {
577    fn handle_pseudonym_error(self) -> Result<T, ApiError> {
578        // TODO: Specific error handling
579        self.map_err(|e| {
580            error!("Pseudonym service error: {e:?}");
581            Internal("Pseudonym service operation failed".to_string())
582        })
583    }
584}