nest_data_source_api/api.rs
1use crate::api::ApiError::Internal;
2use crate::data_source::DataSource;
3use crate::messages::{
4 ActivityRequestMessageType, ActivityResponseMessageType, HasPEPParticipantInfo, PEPMessageType,
5 ParticipantRequestMessageType, ParticipantResponseMessageType,
6};
7use crate::pseudonym_service_factory::{create_pseudonym_service, OauthAuth};
8use actix_web::web::ServiceConfig;
9use actix_web::{error::ErrorUnauthorized, middleware::Logger, web, HttpResponse, ResponseError};
10use actix_web_httpauth::middleware::HttpAuthentication;
11use log::error;
12use oauth_token_service::TokenServiceConfig;
13use paas_client::prelude::PAASConfig;
14use paas_client::pseudonym_service::{PseudonymService, PseudonymServiceError};
15use rand::rngs::OsRng;
16use std::fmt::{Display, Formatter};
17use std::future::Future;
18use std::sync::Arc;
19use tokio::sync::Mutex;
20
21#[derive(Debug)]
22pub enum ApiError {
23 NotFound,
24 Internal(String),
25 BadRequest(String),
26}
27
28impl Display for ApiError {
29 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
30 match self {
31 ApiError::NotFound => write!(f, "Not found"),
32 ApiError::Internal(msg) => write!(f, "Internal error: {}", msg),
33 ApiError::BadRequest(msg) => write!(f, "Bad request: {}", msg),
34 }
35 }
36}
37impl ResponseError for ApiError {
38 fn error_response(&self) -> HttpResponse {
39 match self {
40 ApiError::NotFound => HttpResponse::NotFound().json("Not found"),
41 ApiError::Internal(msg) => HttpResponse::InternalServerError().json(msg),
42 ApiError::BadRequest(msg) => HttpResponse::BadRequest().json(msg),
43 }
44 }
45}
46#[derive(Clone)]
47pub struct DataSourceAPI {
48 connector: Arc<dyn DataSource>,
49 pseudonym_service: Arc<Mutex<PseudonymService>>,
50 nest_auth_token: String,
51}
52
53impl DataSourceAPI {
54 /// Create a new instance of the DataSourceAPI.
55 pub async fn init(
56 connector: impl DataSource,
57 nest_auth_token: String,
58 ps_config: PAASConfig,
59 oauth_config: TokenServiceConfig,
60 ) -> Self {
61 let oauth_auth = OauthAuth::new(oauth_config);
62 let pseudonym_service = create_pseudonym_service(oauth_auth, ps_config).await;
63 Self {
64 connector: Arc::new(connector),
65 pseudonym_service,
66 nest_auth_token,
67 }
68 }
69
70 /// Configure an actix-web service with API routes and authentication middleware.
71 pub fn configure_service(&self, cfg: &mut ServiceConfig) {
72 let auth_token = self.nest_auth_token.clone();
73 let connector = self.connector.clone();
74
75 let auth = HttpAuthentication::bearer(move |req, credentials| {
76 let auth_token = auth_token.clone();
77 async move {
78 if credentials.token() == auth_token {
79 Ok(req)
80 } else {
81 Err((ErrorUnauthorized("Invalid token"), req))
82 }
83 }
84 });
85
86 cfg.app_data(web::Data::new(connector)).service(
87 web::scope("/api")
88 .app_data(web::Data::new(self.pseudonym_service.clone()))
89 .wrap(auth)
90 .route(
91 "/participant/create",
92 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
93 let connector = connector.clone();
94 async move {
95 handle_pep::<
96 ParticipantRequestMessageType,
97 ParticipantResponseMessageType,
98 _,
99 _,
100 >(
101 |r| connector.create_local_participant(r), req, ps
102 )
103 .await
104 }
105 }),
106 )
107 .route(
108 "/participant/update",
109 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
110 let connector = connector.clone();
111 async move {
112 handle_pep::<
113 ParticipantRequestMessageType,
114 ParticipantResponseMessageType,
115 _,
116 _,
117 >(
118 |r| connector.update_local_participant(r), req, ps
119 )
120 .await
121 }
122 }),
123 )
124 .route(
125 "/participant/delete",
126 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
127 let connector = connector.clone();
128 async move {
129 handle_pep::<
130 ParticipantRequestMessageType,
131 ParticipantResponseMessageType,
132 _,
133 _,
134 >(
135 |r| connector.delete_local_participant(r), req, ps
136 )
137 .await
138 }
139 }),
140 )
141 .route(
142 "/participant/details",
143 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
144 let connector = connector.clone();
145 async move {
146 handle_pep::<
147 ParticipantRequestMessageType,
148 ParticipantResponseMessageType,
149 _,
150 _,
151 >(
152 |r| connector.local_participant_details(r), req, ps
153 )
154 .await
155 }
156 }),
157 )
158 .route(
159 "/participant/activities/create",
160 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
161 let connector = connector.clone();
162 async move {
163 handle_pep::<
164 ActivityRequestMessageType,
165 ActivityResponseMessageType,
166 _,
167 _,
168 >(
169 |r| connector.create_participant_activity(r), req, ps
170 )
171 .await
172 }
173 }),
174 )
175 .route(
176 "/participant/activities/update",
177 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
178 let connector = connector.clone();
179 async move {
180 handle_pep::<
181 ActivityRequestMessageType,
182 ActivityResponseMessageType,
183 _,
184 _,
185 >(
186 |r| connector.update_participant_activity(r), req, ps
187 )
188 .await
189 }
190 }),
191 )
192 .route(
193 "/participant/activities/delete",
194 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
195 let connector = connector.clone();
196 async move {
197 handle_pep::<
198 ActivityRequestMessageType,
199 ActivityResponseMessageType,
200 _,
201 _,
202 >(
203 |r| connector.delete_participant_activity(r), req, ps
204 )
205 .await
206 }
207 }),
208 )
209 .route(
210 "/participant/activities/details",
211 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
212 let connector = connector.clone();
213 async move {
214 handle_pep::<
215 ActivityRequestMessageType,
216 ActivityResponseMessageType,
217 _,
218 _,
219 >(
220 |r| connector.participant_activity_details(r), req, ps
221 )
222 .await
223 }
224 }),
225 )
226 .route(
227 "/participant/activities/progress",
228 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
229 let connector = connector.clone();
230 async move {
231 handle_pep::<
232 ActivityRequestMessageType,
233 ActivityResponseMessageType,
234 _,
235 _,
236 >(
237 |r| connector.participant_activity_progress(r), req, ps
238 )
239 .await
240 }
241 }),
242 )
243 .route(
244 "/participant/activities/result",
245 web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
246 let connector = connector.clone();
247 async move {
248 handle_pep::<
249 ActivityRequestMessageType,
250 ActivityResponseMessageType,
251 _,
252 _,
253 >(
254 |r| connector.participant_activity_result(r), req, ps
255 )
256 .await
257 }
258 }), // TODO: maybe we should require different authentication for this route, since we only want end-users to access it
259 ), // TODO: same routes for batches of participants
260 // .route("/participants/create", web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
261 // let connector = connector.clone();
262 // async move { handle(|r| connector.create_local_participants(r), req, ps).await }
263 // }))
264 // .route("/participants/update", web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
265 // let connector = connector.clone();
266 // async move { handle(|r| connector.update_participants(r), req, ps).await }
267 // }))
268 // .route("/participants/delete", web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
269 // let connector = connector.clone();
270 // async move { handle(|r| connector.delete_participants(r), req, ps).await }
271 // }))
272 // .route("/participants/details", web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
273 // let connector = connector.clone();
274 // async move { handle(|r| connector.delete_participants(r), req, ps).await }
275 // }))
276 // .route("/participants/activities/create", web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
277 // let connector = connector.clone();
278 // async move { handle(|r| connector.create_participants_activity(r), req, ps).await }
279 // }))
280 // .route("/participants/activities/update", web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
281 // let connector = connector.clone();
282 // async move { handle(|r| connector.update_participants_activity(r), req, ps).await }
283 // }))
284 // .route("/participants/activities/delete", web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
285 // let connector = connector.clone();
286 // async move { handle(|r| connector.delete_participants_activity(r), req, ps).await }
287 // }))
288 // .route("/participants/activities/details", web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
289 // let connector = connector.clone();
290 // async move { handle(|r| connector.participants_activity_details(r), req, ps).await }
291 // }))
292 // .route("/participants/activities/progress", web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
293 // let connector = connector.clone();
294 // async move { handle(|r| connector.participants_activity_progress(r), req, ps).await }
295 // }))
296 // .route("/participants/activities/result", web::post().to(|connector: web::Data<Arc<dyn DataSource>>, req, ps| {
297 // let connector = connector.clone();
298 // async move { handle(|r| connector.participants_activity_result(r), req, ps).await }
299 // }))
300 );
301 }
302
303 /// Run as a standalone server (if needed)
304 pub async fn run_standalone(self, host: &str, port: u16) -> std::io::Result<()> {
305 use actix_web::{App, HttpServer};
306
307 HttpServer::new(move || {
308 App::new()
309 .wrap(Logger::default())
310 .configure(|cfg| self.configure_service(cfg))
311 })
312 .bind((host, port))?
313 .run()
314 .await
315 }
316}
317
318async fn handle_pep<Req, Res, F, Fut>(
319 handle: F,
320 request: web::Json<Req::PEPMessage>,
321 pseudonym_service: web::Data<Arc<Mutex<PseudonymService>>>,
322) -> Result<HttpResponse, ApiError>
323where
324 Req: PEPMessageType, // Request message type
325 Res: PEPMessageType, // Response message type
326 F: FnOnce(Req::Message) -> Fut, // Async handler type
327 Fut: Future<Output = Result<Option<Res::Message>, ApiError>>, // Handler output
328{
329 let rng = &mut OsRng;
330 let pep_request = request.into_inner();
331 let domain_to = pep_request.domain_to().clone();
332
333 let mut ps = pseudonym_service.lock().await;
334
335 let request = Req::unpack(pep_request, &mut ps).await?;
336 let response: Option<Res::Message> = handle(request).await?;
337
338 if let Some(response) = response {
339 let pep_response = Res::pack(response, domain_to, &mut ps, rng).await?;
340 Ok(HttpResponse::Ok().json(pep_response))
341 } else {
342 Ok(HttpResponse::Ok().finish())
343 }
344}
345
346pub(crate) trait PseudonymServiceErrorHandler<T> {
347 fn handle_pseudonym_error(self) -> Result<T, ApiError>;
348}
349
350impl<T> PseudonymServiceErrorHandler<T> for Result<T, PseudonymServiceError> {
351 fn handle_pseudonym_error(self) -> Result<T, ApiError> {
352 // TODO: Specific error handling
353 self.map_err(|e| {
354 error!("Pseudonym service error: {e:?}");
355 Internal("Pseudonym service operation failed".to_string())
356 })
357 }
358}