edc_dataplane_proxy/service/
edr.rs1use bon::Builder;
2use chrono::{Duration, Utc};
3use edc_dataplane_core::{
4 core::model::{
5 namespace::{EDC_NAMESPACE, IDSA_NAMESPACE},
6 transfer::Transfer,
7 },
8 signaling::{DataAddress, EndpointProperty},
9};
10use thiserror::Error;
11use uuid::Uuid;
12
13use crate::{
14 db::edr::EdrRepoRef,
15 model::{
16 edr::{Edr, EdrClaims, EdrEntry, RefreshTokenId, TokenId},
17 token::{TokenRequest, TokenResponse},
18 },
19};
20
21use super::token::{TokenError, TokenManager};
22
23#[derive(Clone, Builder)]
24pub struct EdrManager<T: TokenManager> {
25 #[builder(into)]
26 pub(crate) proxy_url: String,
27 #[builder(into)]
28 pub(crate) token_url: String,
29 #[builder(into)]
30 issuer: String,
31 #[builder(into)]
32 pub(crate) jwks_url: String,
33 pub(crate) tokens: T,
34 token_duration: Duration,
35 #[builder(default = Duration::days(30))]
36 refresh_token_duration: Duration,
37 store: EdrRepoRef,
38}
39
40impl<T: TokenManager> EdrManager<T> {
41 pub async fn create_edr(&self, req: &Transfer) -> Result<Edr, EdrError> {
42 let token_id: TokenId = Uuid::new_v4().into();
43 let refresh_token_id: RefreshTokenId = Uuid::new_v4().into();
44
45 let data_address = DataAddress::builder()
46 .endpoint_type(IDSA_NAMESPACE.to_iri("HTTP"))
47 .endpoint_properties(self.endpoint_properties(token_id, refresh_token_id, req)?)
48 .build();
49
50 Ok(Edr::builder()
51 .token_id(token_id)
52 .refresh_token_id(refresh_token_id)
53 .data_address(data_address)
54 .build())
55 }
56
57 pub async fn get_by_transfer_id(&self, transfer_id: &str) -> anyhow::Result<Option<EdrEntry>> {
58 self.store.fetch_by_id(transfer_id).await
59 }
60
61 pub async fn save(&self, edr: EdrEntry) -> anyhow::Result<()> {
62 self.store.save(edr).await
63 }
64
65 pub async fn delete(&self, transfer_id: &str) -> anyhow::Result<()> {
66 self.store.delete(transfer_id).await
67 }
68
69 pub async fn refresh_token(&self, req: TokenRequest) -> Result<TokenResponse, EdrError> {
70 let token_id: TokenId = Uuid::new_v4().into();
71 let refresh_token_id: RefreshTokenId = Uuid::new_v4().into();
72
73 let claims = self.tokens.validate::<EdrClaims>(&req.refresh_token)?;
74
75 self.issue_token(
76 token_id,
77 refresh_token_id,
78 &claims.claims.sub,
79 &claims.claims.transfer_id,
80 )
81 }
82
83 fn endpoint_properties(
84 &self,
85 token_id: TokenId,
86 refresh_token_id: RefreshTokenId,
87 transfer: &Transfer,
88 ) -> Result<Vec<EndpointProperty>, EdrError> {
89 let token_response = self.issue_token(
90 token_id,
91 refresh_token_id,
92 &transfer.participant_id,
93 &transfer.id,
94 )?;
95 Ok(vec![
96 EndpointProperty::builder()
97 .name(EDC_NAMESPACE.to_iri("endpoint"))
98 .value(self.proxy_url.clone())
99 .build(),
100 EndpointProperty::builder()
101 .name(EDC_NAMESPACE.to_iri("access_token"))
102 .value(token_response.access_token)
103 .build(),
104 EndpointProperty::builder()
105 .name(EDC_NAMESPACE.to_iri("token_type"))
106 .value("Bearer")
107 .build(),
108 EndpointProperty::builder()
109 .name(EDC_NAMESPACE.to_iri("refresh_token"))
110 .value(token_response.refresh_token)
111 .build(),
112 EndpointProperty::builder()
113 .name(EDC_NAMESPACE.to_iri("refresh_endpoint"))
114 .value(self.token_url.clone())
115 .build(),
116 EndpointProperty::builder()
117 .name(EDC_NAMESPACE.to_iri("expires_in"))
118 .value(token_response.expires_in)
119 .build(),
120 EndpointProperty::builder()
121 .name(EDC_NAMESPACE.to_iri("jwks_url"))
122 .value(self.jwks_url.clone())
123 .build(),
124 ])
125 }
126
127 fn issue_access_token(
128 &self,
129 id: TokenId,
130 participant_id: &str,
131 process_id: &str,
132 ) -> Result<String, EdrError> {
133 self.issue_generic_token(id.into(), participant_id, process_id, self.token_duration)
134 }
135
136 fn issue_generic_token(
137 &self,
138 jti: Uuid,
139 participant_id: &str,
140 process_id: &str,
141 duration: Duration,
142 ) -> Result<String, EdrError> {
143 let now = Utc::now();
144 let exp = now
145 .checked_add_signed(duration)
146 .ok_or_else(|| anyhow::anyhow!("Error adding {}", self.token_duration))
147 .map_err(EdrError::Generic)?;
148
149 let claims = EdrClaims::builder()
150 .jti(jti)
151 .iss(self.issuer.clone())
152 .aud(self.proxy_url.clone())
153 .sub(participant_id.to_string())
154 .exp(exp.timestamp())
155 .iat(now.timestamp())
156 .transfer_id(process_id.to_string())
157 .build();
158
159 self.tokens.issue(&claims).map(Ok)?
160 }
161
162 pub(crate) fn issue_token(
163 &self,
164 token_id: TokenId,
165 refresh_token_id: RefreshTokenId,
166 participant_id: &str,
167 process_id: &str,
168 ) -> Result<TokenResponse, EdrError> {
169 let access_token = self.issue_access_token(token_id, participant_id, process_id)?;
170 let refresh_token =
171 self.issue_refresh_token(refresh_token_id, participant_id, process_id)?;
172
173 Ok(TokenResponse {
174 access_token,
175 refresh_token,
176 expires_in: self.token_duration.num_seconds().to_string(),
177 })
178 }
179
180 fn issue_refresh_token(
181 &self,
182 id: RefreshTokenId,
183 participant_id: &str,
184 process_id: &str,
185 ) -> Result<String, EdrError> {
186 self.issue_generic_token(
187 id.into(),
188 participant_id,
189 process_id,
190 self.refresh_token_duration,
191 )
192 }
193}
194
195#[derive(Error, Debug)]
196pub enum EdrError {
197 #[error("Generic error")]
198 Generic(anyhow::Error),
199 #[error(transparent)]
200 Token(#[from] TokenError),
201}
202
203#[cfg(test)]
204mod tests {
205
206 use super::*;
207 use crate::{db::edr::MockEdrRepo, service::token::MockTokenManager};
208 use chrono::Duration;
209 use edc_dataplane_core::core::model::transfer::TransferStatus;
210 use jsonwebtoken::errors::ErrorKind;
211
212 #[tokio::test]
213 async fn test_create_edr() {
214 let mut token_manager = MockTokenManager::new();
215 let store = MockEdrRepo::new();
216
217 token_manager
218 .expect_issue::<EdrClaims>()
219 .withf(|claims| {
220 claims.iss == "issuer".to_string()
221 && claims.aud == "http://localhost:8080/public".to_string()
222 && claims.sub == "participant_id".to_string()
223 && claims.transfer_id == "process_id".to_string()
224 })
225 .returning(|_: &EdrClaims| Ok("token".to_string()));
226
227 let edr_manager = EdrManager::builder()
228 .proxy_url("http://localhost:8080/public")
229 .issuer("issuer")
230 .tokens(token_manager)
231 .token_duration(Duration::hours(1))
232 .token_url("http://localhost:8080/token")
233 .jwks_url("http://localhost:8080/.well-known/jwks.json")
234 .store(EdrRepoRef::of(store))
235 .build();
236
237 let req = create_transfer();
238
239 let edr = edr_manager.create_edr(&req).await.unwrap();
240
241 assert_eq!(
242 edr.data_address.endpoint_type,
243 IDSA_NAMESPACE.to_iri("HTTP")
244 );
245 assert_eq!(edr.data_address.endpoint_properties.len(), 7);
246
247 assert_eq!(
248 edr.data_address
249 .get_property(&EDC_NAMESPACE.to_iri("access_token")),
250 Some("token")
251 );
252
253 assert_eq!(
254 edr.data_address
255 .get_property(&EDC_NAMESPACE.to_iri("refresh_token")),
256 Some("token")
257 );
258
259 assert_eq!(
260 edr.data_address
261 .get_property(&EDC_NAMESPACE.to_iri("endpoint")),
262 Some(edr_manager.proxy_url.as_ref())
263 );
264
265 assert_eq!(
266 edr.data_address
267 .get_property(&EDC_NAMESPACE.to_iri("jwks_url")),
268 Some(edr_manager.jwks_url.as_ref())
269 );
270
271 assert_eq!(
272 edr.data_address
273 .get_property(&EDC_NAMESPACE.to_iri("expires_in")),
274 Some("3600")
275 );
276 }
277
278 #[tokio::test]
279 async fn test_create_edr_failure() {
280 let mut token_manager = MockTokenManager::new();
281 let store = MockEdrRepo::new();
282
283 token_manager
284 .expect_issue::<EdrClaims>()
285 .returning(|_: &EdrClaims| Err(TokenError::Encode(ErrorKind::InvalidKeyFormat.into())));
286
287 let edr_manager = EdrManager::builder()
288 .proxy_url("http://localhost:8080/public".to_string())
289 .issuer("issuer".to_string())
290 .tokens(token_manager)
291 .token_duration(Duration::days(1))
292 .token_url("http://localhost:8080/token")
293 .jwks_url("http://localhost:8080/.well-known/jwks.json")
294 .store(EdrRepoRef::of(store))
295 .build();
296
297 let req = create_transfer();
298
299 let result = edr_manager.create_edr(&req).await;
300
301 if let Err(EdrError::Token(TokenError::Encode(err))) = result {
302 assert_eq!(err.kind(), &ErrorKind::InvalidKeyFormat);
303 } else {
304 panic!("Wrong type")
305 }
306 }
307
308 fn create_transfer() -> Transfer {
309 Transfer::builder()
310 .participant_id("participant_id".to_string())
311 .id("process_id".to_string())
312 .source(
313 DataAddress::builder()
314 .endpoint_type("MyType".to_string())
315 .endpoint_properties(vec![])
316 .build(),
317 )
318 .status(TransferStatus::Started)
319 .build()
320 }
321}