1use crate::api::ApiError::Internal;
2use crate::batch_messages::{
3 ActivityBatchRequestMessageType, ActivityBatchResponseMessageType, HasPEPBatchInfo,
4 PEPBatchMessageType, PEPPseudoDataPoint, ParticipantBatchRequestMessageType,
5 ParticipantBatchResponseMessageType, PseudoDataPoint,
6};
7use crate::data_source::DataSource;
8use crate::messages::{
9 ActivityRequestMessageType, ActivityResponseMessageType, HasPEPParticipantInfo, PEPMessageType,
10 ParticipantRequestMessageType, ParticipantResponseMessageType,
11};
12use crate::pseudonym_service_factory::{create_pseudonym_service, OauthAuth};
13use actix_web::web::ServiceConfig;
14use actix_web::{error::ErrorUnauthorized, middleware::Logger, web, HttpResponse, ResponseError};
15use actix_web_httpauth::middleware::HttpAuthentication;
16use log::error;
17use oauth_token_service::TokenServiceConfig;
18use paas_client::prelude::PAASConfig;
19use paas_client::pseudonym_service::{PseudonymService, PseudonymServiceError};
20use rand::rngs::OsRng;
21use std::fmt::{Display, Formatter};
22use std::future::Future;
23use std::sync::Arc;
24use tokio::sync::Mutex;
25
26#[derive(Debug)]
27pub enum ApiError {
28 NotFound,
29 Internal(String),
30 BadRequest(String),
31}
32
33impl Display for ApiError {
34 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
35 match self {
36 ApiError::NotFound => write!(f, "Not found"),
37 ApiError::Internal(msg) => write!(f, "Internal error: {}", msg),
38 ApiError::BadRequest(msg) => write!(f, "Bad request: {}", msg),
39 }
40 }
41}
42impl ResponseError for ApiError {
43 fn error_response(&self) -> HttpResponse {
44 match self {
45 ApiError::NotFound => HttpResponse::NotFound().json("Not found"),
46 ApiError::Internal(msg) => HttpResponse::InternalServerError().json(msg),
47 ApiError::BadRequest(msg) => HttpResponse::BadRequest().json(msg),
48 }
49 }
50}
51#[derive(Clone)]
52pub struct DataSourceAPI {
53 connector: Arc<dyn DataSource>,
54 pseudonym_service: Arc<Mutex<PseudonymService>>,
55 nest_auth_token: String,
56}
57
58impl DataSourceAPI {
59 pub async fn init(
61 connector: impl DataSource,
62 nest_auth_token: String,
63 ps_config: PAASConfig,
64 oauth_config: TokenServiceConfig,
65 ) -> Self {
66 let oauth_auth = OauthAuth::new(oauth_config);
67 let pseudonym_service = create_pseudonym_service(oauth_auth, ps_config).await;
68 Self {
69 connector: Arc::new(connector),
70 pseudonym_service,
71 nest_auth_token,
72 }
73 }
74
75 pub fn configure_service(&self, cfg: &mut ServiceConfig) {
77 let auth_token = self.nest_auth_token.clone();
78 let connector = self.connector.clone();
79
80 let auth = HttpAuthentication::bearer(move |req, credentials| {
81 let auth_token = auth_token.clone();
82 async move {
83 if credentials.token() == auth_token {
84 Ok(req)
85 } else {
86 Err((ErrorUnauthorized("Invalid token"), req))
87 }
88 }
89 });
90
91 cfg.app_data(web::Data::new(connector)).service(
92 web::scope("/api")
93 .app_data(web::Data::new(self.pseudonym_service.clone()))
94 .wrap(auth)
95 .route(
96 "/participant/create",
97 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
98 let connector = connector.clone();
99 async move {
100 handle_pep::<
101 ParticipantRequestMessageType,
102 ParticipantResponseMessageType,
103 _,
104 _,
105 >(
106 |r| connector.create_local_participant(r), req, ps
107 )
108 .await
109 }
110 }),
111 )
112 .route(
113 "/participant/create_batch",
114 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
115 let connector = connector.clone();
116 async move {
117 handle_pep_batch::<
118 ParticipantBatchRequestMessageType,
119 ParticipantBatchResponseMessageType,
120 PEPPseudoDataPoint,
121 PseudoDataPoint,
122 _,
123 _,
124 >(
125 |r| connector.create_local_participant_batch(r), req, ps
126 )
127 .await
128 }
129 }),
130 )
131 .route(
132 "/participant/update",
133 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
134 let connector = connector.clone();
135 async move {
136 handle_pep::<
137 ParticipantRequestMessageType,
138 ParticipantResponseMessageType,
139 _,
140 _,
141 >(
142 |r| connector.update_local_participant(r), req, ps
143 )
144 .await
145 }
146 }),
147 )
148 .route(
149 "/participant/update_batch",
150 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
151 let connector = connector.clone();
152 async move {
153 handle_pep_batch::<
154 ParticipantBatchRequestMessageType,
155 ParticipantBatchResponseMessageType,
156 PEPPseudoDataPoint,
157 PseudoDataPoint,
158 _,
159 _,
160 >(
161 |r| connector.update_local_participant_batch(r), req, ps
162 )
163 .await
164 }
165 }),
166 )
167 .route(
168 "/participant/delete",
169 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
170 let connector = connector.clone();
171 async move {
172 handle_pep::<
173 ParticipantRequestMessageType,
174 ParticipantResponseMessageType,
175 _,
176 _,
177 >(
178 |r| connector.delete_local_participant(r), req, ps
179 )
180 .await
181 }
182 }),
183 )
184 .route(
185 "/participant/delete_batch",
186 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
187 let connector = connector.clone();
188 async move {
189 handle_pep_batch::<
190 ParticipantBatchRequestMessageType,
191 ParticipantBatchResponseMessageType,
192 PEPPseudoDataPoint,
193 PseudoDataPoint,
194 _,
195 _,
196 >(
197 |r| connector.delete_local_participant_batch(r), req, ps
198 )
199 .await
200 }
201 }),
202 )
203 .route(
204 "/participant/details",
205 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
206 let connector = connector.clone();
207 async move {
208 handle_pep::<
209 ParticipantRequestMessageType,
210 ParticipantResponseMessageType,
211 _,
212 _,
213 >(
214 |r| connector.local_participant_details(r), req, ps
215 )
216 .await
217 }
218 }),
219 )
220 .route(
221 "/participant/details_batch",
222 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
223 let connector = connector.clone();
224 async move {
225 handle_pep_batch::<
226 ParticipantBatchRequestMessageType,
227 ParticipantBatchResponseMessageType,
228 PEPPseudoDataPoint,
229 PseudoDataPoint,
230 _,
231 _,
232 >(
233 |r| connector.local_participant_details_batch(r), req, ps
234 )
235 .await
236 }
237 }),
238 )
239 .route(
240 "/participant/activities/create",
241 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
242 let connector = connector.clone();
243 async move {
244 handle_pep::<
245 ActivityRequestMessageType,
246 ActivityResponseMessageType,
247 _,
248 _,
249 >(
250 |r| connector.create_participant_activity(r), req, ps
251 )
252 .await
253 }
254 }),
255 )
256 .route(
257 "/participant/activities/create_batch",
258 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
259 let connector = connector.clone();
260 async move {
261 handle_pep_batch::<
262 ActivityBatchRequestMessageType,
263 ActivityBatchResponseMessageType,
264 PEPPseudoDataPoint,
265 PseudoDataPoint,
266 _,
267 _,
268 >(
269 |r| connector.create_participant_activity_batch(r), req, ps
270 )
271 .await
272 }
273 }),
274 )
275 .route(
276 "/participant/activities/update",
277 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
278 let connector = connector.clone();
279 async move {
280 handle_pep::<
281 ActivityRequestMessageType,
282 ActivityResponseMessageType,
283 _,
284 _,
285 >(
286 |r| connector.update_participant_activity(r), req, ps
287 )
288 .await
289 }
290 }),
291 )
292 .route(
293 "/participant/activities/update_batch",
294 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
295 let connector = connector.clone();
296 async move {
297 handle_pep_batch::<
298 ActivityBatchRequestMessageType,
299 ActivityBatchResponseMessageType,
300 PEPPseudoDataPoint,
301 PseudoDataPoint,
302 _,
303 _,
304 >(
305 |r| connector.update_participant_activity_batch(r), req, ps
306 )
307 .await
308 }
309 }),
310 )
311 .route(
312 "/participant/activities/delete",
313 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
314 let connector = connector.clone();
315 async move {
316 handle_pep::<
317 ActivityRequestMessageType,
318 ActivityResponseMessageType,
319 _,
320 _,
321 >(
322 |r| connector.delete_participant_activity(r), req, ps
323 )
324 .await
325 }
326 }),
327 )
328 .route(
329 "/participant/activities/delete_batch",
330 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
331 let connector = connector.clone();
332 async move {
333 handle_pep_batch::<
334 ActivityBatchRequestMessageType,
335 ActivityBatchResponseMessageType,
336 PEPPseudoDataPoint,
337 PseudoDataPoint,
338 _,
339 _,
340 >(
341 |r| connector.delete_participant_activity_batch(r), req, ps
342 )
343 .await
344 }
345 }),
346 )
347 .route(
348 "/participant/activities/details",
349 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
350 let connector = connector.clone();
351 async move {
352 handle_pep::<
353 ActivityRequestMessageType,
354 ActivityResponseMessageType,
355 _,
356 _,
357 >(
358 |r| connector.participant_activity_details(r), req, ps
359 )
360 .await
361 }
362 }),
363 )
364 .route(
365 "/participant/activities/details_batch",
366 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
367 let connector = connector.clone();
368 async move {
369 handle_pep_batch::<
370 ActivityBatchRequestMessageType,
371 ActivityBatchResponseMessageType,
372 PEPPseudoDataPoint,
373 PseudoDataPoint,
374 _,
375 _,
376 >(
377 |r| connector.participant_activity_details_batch(r), req, ps
378 )
379 .await
380 }
381 }),
382 )
383 .route(
384 "/participant/activities/progress",
385 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
386 let connector = connector.clone();
387 async move {
388 handle_pep::<
389 ActivityRequestMessageType,
390 ActivityResponseMessageType,
391 _,
392 _,
393 >(
394 |r| connector.participant_activity_progress(r), req, ps
395 )
396 .await
397 }
398 }),
399 )
400 .route(
401 "/participant/activities/progress_batch",
402 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
403 let connector = connector.clone();
404 async move {
405 handle_pep_batch::<
406 ActivityBatchRequestMessageType,
407 ActivityBatchResponseMessageType,
408 PEPPseudoDataPoint,
409 PseudoDataPoint,
410 _,
411 _,
412 >(
413 |r| connector.participant_activity_progress_batch(r),
414 req,
415 ps,
416 )
417 .await
418 }
419 }),
420 )
421 .route(
422 "/participant/activities/result",
423 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
424 let connector = connector.clone();
425 async move {
426 handle_pep::<
427 ActivityRequestMessageType,
428 ActivityResponseMessageType,
429 _,
430 _,
431 >(
432 |r| connector.participant_activity_result(r), req, ps
433 )
434 .await
435 }
436 }), )
438 .route(
439 "/participant/activities/result_batch",
440 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
441 let connector = connector.clone();
442 async move {
443 handle_pep_batch::<
444 ActivityBatchRequestMessageType,
445 ActivityBatchResponseMessageType,
446 PEPPseudoDataPoint,
447 PseudoDataPoint,
448 _,
449 _,
450 >(
451 |r| connector.participant_activity_result_batch(r), req, ps
452 )
453 .await
454 }
455 }),
456 ),
457 );
458 }
499
500 pub async fn run_standalone(self, host: &str, port: u16) -> std::io::Result<()> {
502 use actix_web::{App, HttpServer};
503
504 HttpServer::new(move || {
505 App::new()
506 .wrap(Logger::default())
507 .configure(|cfg| self.configure_service(cfg))
508 })
509 .bind((host, port))?
510 .run()
511 .await
512 }
513}
514
515async fn handle_pep<Req, Res, F, Fut>(
516 handle: F,
517 request: web::Json<Req::PEPMessage>,
518 pseudonym_service: web::Data<Arc<Mutex<PseudonymService>>>,
519) -> Result<HttpResponse, ApiError>
520where
521 Req: PEPMessageType, Res: PEPMessageType, F: FnOnce(Req::Message) -> Fut, Fut: Future<Output = Result<Option<Res::Message>, ApiError>>, {
526 let rng = &mut OsRng;
527 let pep_request = request.into_inner();
528 let domain_to = pep_request.domain_to().clone();
529
530 let mut ps = pseudonym_service.lock().await;
531
532 let request = Req::unpack(pep_request, &mut ps).await?;
533 let response: Option<Res::Message> = handle(request).await?;
534
535 if let Some(response) = response {
536 let pep_response = Res::pack(response, domain_to, &mut ps, rng).await?;
537 Ok(HttpResponse::Ok().json(pep_response))
538 } else {
539 Ok(HttpResponse::Ok().finish())
540 }
541}
542
543async fn handle_pep_batch<Req, Res, PepT, T, F, Fut>(
544 handle: F,
545 request: web::Json<Req::PEPBatchMessage>,
546 pseudonym_service: web::Data<Arc<Mutex<PseudonymService>>>,
547) -> Result<HttpResponse, ApiError>
548where
549 Req: PEPBatchMessageType<PepT, T>, Res: PEPBatchMessageType<PepT, T>, F: FnOnce(Req::BatchMessage) -> Fut, Fut: Future<Output = Result<Option<Res::BatchMessage>, ApiError>>, {
554 let rng = &mut OsRng;
555 let pep_request = request.into_inner();
556
557 let domain_to = pep_request.domain_to().clone();
558
559 let mut ps = pseudonym_service.lock().await;
560
561 let request = Req::unpack(pep_request, &mut ps).await?;
562 let response: Option<Res::BatchMessage> = handle(request).await?;
563
564 if let Some(response) = response {
565 let pep_response = Res::pack(response, domain_to, &mut ps, rng).await?;
566 Ok(HttpResponse::Ok().json(pep_response))
567 } else {
568 Ok(HttpResponse::Ok().finish())
569 }
570}
571
572pub(crate) trait PseudonymServiceErrorHandler<T> {
573 fn handle_pseudonym_error(self) -> Result<T, ApiError>;
574}
575
576impl<T> PseudonymServiceErrorHandler<T> for Result<T, PseudonymServiceError> {
577 fn handle_pseudonym_error(self) -> Result<T, ApiError> {
578 self.map_err(|e| {
580 error!("Pseudonym service error: {e:?}");
581 Internal("Pseudonym service operation failed".to_string())
582 })
583 }
584}