1use crate::api::ApiError::Internal;
2use crate::batch_messages::{
3 ActivityBatchRequestMessageType, ActivityBatchResponseMessageType, HasPEPBatchInfo,
4 PEPBatchMessageType, PEPPseudoDataPoint, PseudoDataPoint,
5};
6use crate::data_source::DataSource;
7use crate::messages::{
8 ActivityRequestMessageType, ActivityResponseMessageType, HasPEPParticipantInfo, PEPMessageType,
9};
10use crate::pseudonym_service_factory::{create_pseudonym_service, OauthAuth};
11use actix_web::web::ServiceConfig;
12use actix_web::{error::ErrorUnauthorized, middleware::Logger, web, HttpResponse, ResponseError};
13use actix_web_httpauth::middleware::HttpAuthentication;
14use log::error;
15use oauth_token_service::TokenServiceConfig;
16use paas_client::prelude::PAASConfig;
17use paas_client::pseudonym_service::{PseudonymService, PseudonymServiceError};
18use rand::rngs::OsRng;
19use std::fmt::{Display, Formatter};
20use std::future::Future;
21use std::sync::Arc;
22use tokio::sync::Mutex;
23
24#[derive(Debug)]
25pub enum ApiError {
26 NotFound,
27 Internal(String),
28 BadRequest(String),
29}
30
31impl Display for ApiError {
32 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
33 match self {
34 ApiError::NotFound => write!(f, "Not found"),
35 ApiError::Internal(msg) => write!(f, "Internal error: {}", msg),
36 ApiError::BadRequest(msg) => write!(f, "Bad request: {}", msg),
37 }
38 }
39}
40impl ResponseError for ApiError {
41 fn error_response(&self) -> HttpResponse {
42 match self {
43 ApiError::NotFound => HttpResponse::NotFound().json("Not found"),
44 ApiError::Internal(msg) => HttpResponse::InternalServerError().json(msg),
45 ApiError::BadRequest(msg) => HttpResponse::BadRequest().json(msg),
46 }
47 }
48}
49#[derive(Clone)]
50pub struct DataSourceAPI {
51 connector: Arc<dyn DataSource>,
52 pseudonym_service: Arc<Mutex<PseudonymService>>,
53 nest_auth_token: String,
54}
55
56impl DataSourceAPI {
57 pub async fn init(
59 connector: impl DataSource,
60 nest_auth_token: String,
61 ps_config: PAASConfig,
62 oauth_config: TokenServiceConfig,
63 ) -> Self {
64 let oauth_auth = OauthAuth::new(oauth_config);
65 let pseudonym_service = create_pseudonym_service(oauth_auth, ps_config).await;
66 Self {
67 connector: Arc::new(connector),
68 pseudonym_service,
69 nest_auth_token,
70 }
71 }
72
73 pub fn configure_service(&self, cfg: &mut ServiceConfig) {
75 let auth_token = self.nest_auth_token.clone();
76 let connector = self.connector.clone();
77
78 let auth = HttpAuthentication::bearer(move |req, credentials| {
79 let auth_token = auth_token.clone();
80 async move {
81 if credentials.token() == auth_token {
82 Ok(req)
83 } else {
84 Err((ErrorUnauthorized("Invalid token"), req))
85 }
86 }
87 });
88
89 cfg.app_data(web::Data::new(connector)).service(
90 web::scope("/api")
91 .app_data(web::Data::new(self.pseudonym_service.clone()))
92 .wrap(auth)
93 .route(
94 "/activities/create",
95 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
96 let connector = connector.clone();
97 async move {
98 handle_pep::<
99 ActivityRequestMessageType,
100 ActivityResponseMessageType,
101 _,
102 _,
103 >(
104 |r| connector.create_participant_activity(r), req, ps
105 )
106 .await
107 }
108 }),
109 )
110 .route(
111 "/activities/create_batch",
112 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
113 let connector = connector.clone();
114 async move {
115 handle_pep_batch::<
116 ActivityBatchRequestMessageType,
117 ActivityBatchResponseMessageType,
118 PEPPseudoDataPoint,
119 PseudoDataPoint,
120 _,
121 _,
122 >(
123 |r| connector.create_participant_activity_batch(r), req, ps
124 )
125 .await
126 }
127 }),
128 )
129 .route(
130 "/activities/update",
131 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
132 let connector = connector.clone();
133 async move {
134 handle_pep::<
135 ActivityRequestMessageType,
136 ActivityResponseMessageType,
137 _,
138 _,
139 >(
140 |r| connector.update_participant_activity(r), req, ps
141 )
142 .await
143 }
144 }),
145 )
146 .route(
147 "/activities/update_batch",
148 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
149 let connector = connector.clone();
150 async move {
151 handle_pep_batch::<
152 ActivityBatchRequestMessageType,
153 ActivityBatchResponseMessageType,
154 PEPPseudoDataPoint,
155 PseudoDataPoint,
156 _,
157 _,
158 >(
159 |r| connector.update_participant_activity_batch(r), req, ps
160 )
161 .await
162 }
163 }),
164 )
165 .route(
166 "/activities/delete",
167 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
168 let connector = connector.clone();
169 async move {
170 handle_pep::<
171 ActivityRequestMessageType,
172 ActivityResponseMessageType,
173 _,
174 _,
175 >(
176 |r| connector.delete_participant_activity(r), req, ps
177 )
178 .await
179 }
180 }),
181 )
182 .route(
183 "/activities/delete_batch",
184 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
185 let connector = connector.clone();
186 async move {
187 handle_pep_batch::<
188 ActivityBatchRequestMessageType,
189 ActivityBatchResponseMessageType,
190 PEPPseudoDataPoint,
191 PseudoDataPoint,
192 _,
193 _,
194 >(
195 |r| connector.delete_participant_activity_batch(r), req, ps
196 )
197 .await
198 }
199 }),
200 )
201 .route(
202 "/activities/details",
203 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
204 let connector = connector.clone();
205 async move {
206 handle_pep::<
207 ActivityRequestMessageType,
208 ActivityResponseMessageType,
209 _,
210 _,
211 >(
212 |r| connector.participant_activity_details(r), req, ps
213 )
214 .await
215 }
216 }),
217 )
218 .route(
219 "/activities/details_batch",
220 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
221 let connector = connector.clone();
222 async move {
223 handle_pep_batch::<
224 ActivityBatchRequestMessageType,
225 ActivityBatchResponseMessageType,
226 PEPPseudoDataPoint,
227 PseudoDataPoint,
228 _,
229 _,
230 >(
231 |r| connector.participant_activity_details_batch(r), req, ps
232 )
233 .await
234 }
235 }),
236 )
237 .route(
238 "/activities/progress",
239 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
240 let connector = connector.clone();
241 async move {
242 handle_pep::<
243 ActivityRequestMessageType,
244 ActivityResponseMessageType,
245 _,
246 _,
247 >(
248 |r| connector.participant_activity_progress(r), req, ps
249 )
250 .await
251 }
252 }),
253 )
254 .route(
255 "/activities/progress_batch",
256 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
257 let connector = connector.clone();
258 async move {
259 handle_pep_batch::<
260 ActivityBatchRequestMessageType,
261 ActivityBatchResponseMessageType,
262 PEPPseudoDataPoint,
263 PseudoDataPoint,
264 _,
265 _,
266 >(
267 |r| connector.participant_activity_progress_batch(r),
268 req,
269 ps,
270 )
271 .await
272 }
273 }),
274 )
275 .route(
276 "/activities/result",
277 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
278 let connector = connector.clone();
279 async move {
280 handle_pep::<
281 ActivityRequestMessageType,
282 ActivityResponseMessageType,
283 _,
284 _,
285 >(
286 |r| connector.participant_activity_result(r), req, ps
287 )
288 .await
289 }
290 }), )
292 .route(
293 "/activities/result_batch",
294 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
295 let connector = connector.clone();
296 async move {
297 handle_pep_batch::<
298 ActivityBatchRequestMessageType,
299 ActivityBatchResponseMessageType,
300 PEPPseudoDataPoint,
301 PseudoDataPoint,
302 _,
303 _,
304 >(
305 |r| connector.participant_activity_result_batch(r), req, ps
306 )
307 .await
308 }
309 }),
310 ),
311 );
312 }
313
314 pub async fn run_standalone(self, host: &str, port: u16) -> std::io::Result<()> {
316 use actix_web::{App, HttpServer};
317
318 HttpServer::new(move || {
319 App::new()
320 .wrap(Logger::default())
321 .configure(|cfg| self.configure_service(cfg))
322 })
323 .bind((host, port))?
324 .run()
325 .await
326 }
327}
328
329async fn handle_pep<Req, Res, F, Fut>(
330 handle: F,
331 request: web::Json<Req::PEPMessage>,
332 pseudonym_service: web::Data<Arc<Mutex<PseudonymService>>>,
333) -> Result<HttpResponse, ApiError>
334where
335 Req: PEPMessageType, Res: PEPMessageType, F: FnOnce(Req::Message) -> Fut, Fut: Future<Output = Result<Option<Res::Message>, ApiError>>, {
340 let rng = &mut OsRng;
341 let pep_request = request.into_inner();
342 let domain_to = pep_request.domain_to().clone();
343
344 let mut ps = pseudonym_service.lock().await;
345
346 let request = Req::unpack(pep_request, &mut ps).await?;
347 let response: Option<Res::Message> = handle(request).await?;
348
349 if let Some(response) = response {
350 let pep_response = Res::pack(response, domain_to, &mut ps, rng).await?;
351 Ok(HttpResponse::Ok().json(pep_response))
352 } else {
353 Ok(HttpResponse::Ok().finish())
354 }
355}
356
357async fn handle_pep_batch<Req, Res, PepT, T, F, Fut>(
358 handle: F,
359 request: web::Json<Req::PEPBatchMessage>,
360 pseudonym_service: web::Data<Arc<Mutex<PseudonymService>>>,
361) -> Result<HttpResponse, ApiError>
362where
363 Req: PEPBatchMessageType<PepT, T>, Res: PEPBatchMessageType<PepT, T>, F: FnOnce(Req::BatchMessage) -> Fut, Fut: Future<Output = Result<Option<Res::BatchMessage>, ApiError>>, {
368 let rng = &mut OsRng;
369 let pep_request = request.into_inner();
370
371 let domain_to = pep_request.domain_to().clone();
372
373 let mut ps = pseudonym_service.lock().await;
374
375 let request = Req::unpack(pep_request, &mut ps).await?;
376 let response: Option<Res::BatchMessage> = handle(request).await?;
377
378 if let Some(response) = response {
379 let pep_response = Res::pack(response, domain_to, &mut ps, rng).await?;
380 Ok(HttpResponse::Ok().json(pep_response))
381 } else {
382 Ok(HttpResponse::Ok().finish())
383 }
384}
385
386pub(crate) trait PseudonymServiceErrorHandler<T> {
387 fn handle_pseudonym_error(self) -> Result<T, ApiError>;
388}
389
390impl<T> PseudonymServiceErrorHandler<T> for Result<T, PseudonymServiceError> {
391 fn handle_pseudonym_error(self) -> Result<T, ApiError> {
392 self.map_err(|e| {
394 error!("Pseudonym service error: {e:?}");
395 Internal("Pseudonym service operation failed".to_string())
396 })
397 }
398}