Skip to main content

nest_data_source_api/
api.rs

1use crate::api::ApiError::Internal;
2use crate::batch_messages::{
3    ActivityBatchRequestMessageType, ActivityBatchResponseMessageType, HasPEPBatchInfo,
4    PEPBatchMessageType, Record,
5};
6use crate::data_source::DataSource;
7use crate::messages::{
8    ActivityRequestMessageType, ActivityResponseMessageType, HasPEPParticipantInfo, PEPMessageType,
9};
10use crate::pseudonym_service_factory::{create_pseudonym_service_pool, OauthAuth};
11use crate::pseudonym_service_pool::PseudonymServicePool;
12use actix_web::web::ServiceConfig;
13use actix_web::{error::ErrorUnauthorized, middleware::Logger, web, HttpResponse, ResponseError};
14use actix_web_httpauth::middleware::HttpAuthentication;
15use libpep::data::json::EncryptedPEPJSONValue;
16use log::error;
17use oauth_token_service::TokenServiceConfig;
18use paas_client::prelude::PAASConfig;
19use paas_client::pseudonym_service::PseudonymServiceError;
20use std::fmt::{Display, Formatter};
21use std::future::Future;
22use std::sync::Arc;
23
24#[derive(Debug)]
25pub enum ApiError {
26    NotFound,
27    Internal(String),
28    BadRequest(String),
29}
30
31impl Display for ApiError {
32    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
33        match self {
34            ApiError::NotFound => write!(f, "Not found"),
35            ApiError::Internal(msg) => write!(f, "Internal error: {}", msg),
36            ApiError::BadRequest(msg) => write!(f, "Bad request: {}", msg),
37        }
38    }
39}
40impl ResponseError for ApiError {
41    fn error_response(&self) -> HttpResponse {
42        match self {
43            ApiError::NotFound => HttpResponse::NotFound().json("Not found"),
44            ApiError::Internal(msg) => HttpResponse::InternalServerError().json(msg),
45            ApiError::BadRequest(msg) => HttpResponse::BadRequest().json(msg),
46        }
47    }
48}
49#[derive(Clone)]
50pub struct DataSourceAPI {
51    connector: Arc<dyn DataSource>,
52    pseudonym_service: PseudonymServicePool,
53    nest_auth_token: String,
54}
55
56impl DataSourceAPI {
57    /// Create a new instance of the DataSourceAPI.
58    ///
59    /// The `pool_size` parameter determines how many concurrent PseudonymService instances
60    /// will be created. A larger pool allows more concurrent operations but uses more resources.
61    /// Recommended values: 4-8 for most use cases.
62    pub async fn init(
63        connector: impl DataSource,
64        nest_auth_token: String,
65        ps_config: PAASConfig,
66        oauth_config: TokenServiceConfig,
67        pool_size: usize,
68    ) -> Self {
69        let oauth_auth = OauthAuth::new(Some(oauth_config))
70            .await
71            .expect("Failed to create OAuth authentication");
72        let pseudonym_service =
73            create_pseudonym_service_pool(oauth_auth, ps_config, pool_size).await;
74        Self {
75            connector: Arc::new(connector),
76            pseudonym_service,
77            nest_auth_token,
78        }
79    }
80
81    /// Configure an actix-web service with API routes and authentication middleware.
82    pub fn configure_service(&self, cfg: &mut ServiceConfig) {
83        let auth_token = self.nest_auth_token.clone();
84        let connector = self.connector.clone();
85
86        let auth = HttpAuthentication::bearer(move |req, credentials| {
87            let auth_token = auth_token.clone();
88            async move {
89                if credentials.token() == auth_token {
90                    Ok(req)
91                } else {
92                    Err((ErrorUnauthorized("Invalid token"), req))
93                }
94            }
95        });
96
97        cfg.app_data(web::Data::new(connector)).service(
98            web::scope("/api")
99                .app_data(web::Data::new(self.pseudonym_service.clone()))
100                .wrap(auth)
101                .route(
102                    "/activities/create",
103                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
104                        let connector = connector.clone();
105                        async move {
106                            handle_pep::<
107                                ActivityRequestMessageType,
108                                ActivityResponseMessageType,
109                                _,
110                                _,
111                            >(
112                                |r| connector.create_participant_activity(r), req, ps
113                            )
114                            .await
115                        }
116                    }),
117                )
118                .route(
119                    "/activities/create_batch",
120                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
121                        let connector = connector.clone();
122                        async move {
123                            handle_pep_batch::<
124                                ActivityBatchRequestMessageType,
125                                ActivityBatchResponseMessageType,
126                                EncryptedPEPJSONValue,
127                                Record,
128                                _,
129                                _,
130                            >(
131                                |r| connector.create_participant_activity_batch(r), req, ps
132                            )
133                            .await
134                        }
135                    }),
136                )
137                .route(
138                    "/activities/update",
139                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
140                        let connector = connector.clone();
141                        async move {
142                            handle_pep::<
143                                ActivityRequestMessageType,
144                                ActivityResponseMessageType,
145                                _,
146                                _,
147                            >(
148                                |r| connector.update_participant_activity(r), req, ps
149                            )
150                            .await
151                        }
152                    }),
153                )
154                .route(
155                    "/activities/update_batch",
156                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
157                        let connector = connector.clone();
158                        async move {
159                            handle_pep_batch::<
160                                ActivityBatchRequestMessageType,
161                                ActivityBatchResponseMessageType,
162                                EncryptedPEPJSONValue,
163                                Record,
164                                _,
165                                _,
166                            >(
167                                |r| connector.update_participant_activity_batch(r), req, ps
168                            )
169                            .await
170                        }
171                    }),
172                )
173                .route(
174                    "/activities/delete",
175                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
176                        let connector = connector.clone();
177                        async move {
178                            handle_pep::<
179                                ActivityRequestMessageType,
180                                ActivityResponseMessageType,
181                                _,
182                                _,
183                            >(
184                                |r| connector.delete_participant_activity(r), req, ps
185                            )
186                            .await
187                        }
188                    }),
189                )
190                .route(
191                    "/activities/delete_batch",
192                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
193                        let connector = connector.clone();
194                        async move {
195                            handle_pep_batch::<
196                                ActivityBatchRequestMessageType,
197                                ActivityBatchResponseMessageType,
198                                EncryptedPEPJSONValue,
199                                Record,
200                                _,
201                                _,
202                            >(
203                                |r| connector.delete_participant_activity_batch(r), req, ps
204                            )
205                            .await
206                        }
207                    }),
208                )
209                .route(
210                    "/activities/details",
211                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
212                        let connector = connector.clone();
213                        async move {
214                            handle_pep::<
215                                ActivityRequestMessageType,
216                                ActivityResponseMessageType,
217                                _,
218                                _,
219                            >(
220                                |r| connector.participant_activity_details(r), req, ps
221                            )
222                            .await
223                        }
224                    }),
225                )
226                .route(
227                    "/activities/details_batch",
228                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
229                        let connector = connector.clone();
230                        async move {
231                            handle_pep_batch::<
232                                ActivityBatchRequestMessageType,
233                                ActivityBatchResponseMessageType,
234                                EncryptedPEPJSONValue,
235                                Record,
236                                _,
237                                _,
238                            >(
239                                |r| connector.participant_activity_details_batch(r), req, ps
240                            )
241                            .await
242                        }
243                    }),
244                )
245                .route(
246                    "/activities/progress",
247                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
248                        let connector = connector.clone();
249                        async move {
250                            handle_pep::<
251                                ActivityRequestMessageType,
252                                ActivityResponseMessageType,
253                                _,
254                                _,
255                            >(
256                                |r| connector.participant_activity_progress(r), req, ps
257                            )
258                            .await
259                        }
260                    }),
261                )
262                .route(
263                    "/activities/progress_batch",
264                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
265                        let connector = connector.clone();
266                        async move {
267                            handle_pep_batch::<
268                                ActivityBatchRequestMessageType,
269                                ActivityBatchResponseMessageType,
270                                EncryptedPEPJSONValue,
271                                Record,
272                                _,
273                                _,
274                            >(
275                                |r| connector.participant_activity_progress_batch(r),
276                                req,
277                                ps,
278                            )
279                            .await
280                        }
281                    }),
282                )
283                .route(
284                    "/activities/result",
285                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
286                        let connector = connector.clone();
287                        async move {
288                            handle_pep::<
289                                ActivityRequestMessageType,
290                                ActivityResponseMessageType,
291                                _,
292                                _,
293                            >(
294                                |r| connector.participant_activity_result(r), req, ps
295                            )
296                            .await
297                        }
298                    }), // TODO: maybe we should require different authentication for this route, since we only want end-users to access it
299                )
300                .route(
301                    "/activities/result_batch",
302                    web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
303                        let connector = connector.clone();
304                        async move {
305                            handle_pep_batch::<
306                                ActivityBatchRequestMessageType,
307                                ActivityBatchResponseMessageType,
308                                EncryptedPEPJSONValue,
309                                Record,
310                                _,
311                                _,
312                            >(
313                                |r| connector.participant_activity_result_batch(r), req, ps
314                            )
315                            .await
316                        }
317                    }),
318                ),
319        );
320    }
321
322    /// Run as a standalone server (if needed)
323    pub async fn run_standalone(self, host: &str, port: u16) -> std::io::Result<()> {
324        use actix_web::{App, HttpServer};
325
326        HttpServer::new(move || {
327            App::new()
328                .wrap(Logger::default())
329                .configure(|cfg| self.configure_service(cfg))
330        })
331        .bind((host, port))?
332        .run()
333        .await
334    }
335}
336
337async fn handle_pep<Req, Res, F, Fut>(
338    handle: F,
339    request: web::Json<Req::PEPMessage>,
340    pseudonym_service: web::Data<PseudonymServicePool>,
341) -> Result<HttpResponse, ApiError>
342where
343    Req: PEPMessageType,            // Request message type
344    Res: PEPMessageType,            // Response message type
345    F: FnOnce(Req::Message) -> Fut, // Async handler type
346    Fut: Future<Output = Result<Option<Res::Message>, ApiError>>, // Handler output
347{
348    let mut rng = rand::rng();
349    let pep_request = request.into_inner();
350    let domain_to = pep_request.domain_to().clone();
351
352    let request = {
353        let mut ps = pseudonym_service.acquire().await;
354        Req::unpack(pep_request, &mut ps).await?
355    };
356
357    let response: Option<Res::Message> = handle(request).await?;
358
359    if let Some(response) = response {
360        let mut ps = pseudonym_service.acquire().await;
361        let pep_response = Res::pack(response, domain_to, &mut ps, &mut rng).await?;
362        Ok(HttpResponse::Ok().json(pep_response))
363    } else {
364        Ok(HttpResponse::Ok().finish())
365    }
366}
367
368async fn handle_pep_batch<Req, Res, PepT, T, F, Fut>(
369    handle: F,
370    request: web::Json<Req::PEPBatchMessage>,
371    pseudonym_service: web::Data<PseudonymServicePool>,
372) -> Result<HttpResponse, ApiError>
373where
374    Req: PEPBatchMessageType<PepT, T>,   // Request message type
375    Res: PEPBatchMessageType<PepT, T>,   // Response message type
376    F: FnOnce(Req::BatchMessage) -> Fut, // Async handler type
377    Fut: Future<Output = Result<Option<Res::BatchMessage>, ApiError>>, // Handler output
378{
379    let pep_request = request.into_inner();
380
381    let domain_from = pep_request.domain().clone();
382
383    let request = {
384        let mut ps = pseudonym_service.acquire().await;
385        Req::unpack(pep_request, &mut ps).await?
386    };
387
388    let response: Option<Res::BatchMessage> = handle(request).await?;
389
390    if let Some(response) = response {
391        let mut ps = pseudonym_service.acquire().await;
392        let pep_response = Res::pack(response, domain_from, &mut ps)?;
393        Ok(HttpResponse::Ok().json(pep_response))
394    } else {
395        Ok(HttpResponse::Ok().finish())
396    }
397}
398
399pub(crate) trait PseudonymServiceErrorHandler<T> {
400    fn handle_pseudonym_error(self) -> Result<T, ApiError>;
401}
402
403impl<T> PseudonymServiceErrorHandler<T> for Result<T, PseudonymServiceError> {
404    fn handle_pseudonym_error(self) -> Result<T, ApiError> {
405        // TODO: Specific error handling
406        self.map_err(|e| {
407            error!("Pseudonym service error: {e:?}");
408            Internal("Pseudonym service operation failed".to_string())
409        })
410    }
411}