edc_dataplane_proxy/service/
edr.rs

1use bon::Builder;
2use chrono::{Duration, Utc};
3use edc_dataplane_core::{
4    core::model::{
5        namespace::{EDC_NAMESPACE, IDSA_NAMESPACE},
6        transfer::Transfer,
7    },
8    signaling::{DataAddress, EndpointProperty},
9};
10use thiserror::Error;
11use uuid::Uuid;
12
13use crate::{
14    db::edr::EdrRepoRef,
15    model::{
16        edr::{Edr, EdrClaims, EdrEntry, RefreshTokenId, TokenId},
17        token::{TokenRequest, TokenResponse},
18    },
19};
20
21use super::token::{TokenError, TokenManager};
22
23#[derive(Clone, Builder)]
24pub struct EdrManager<T: TokenManager> {
25    #[builder(into)]
26    pub(crate) proxy_url: String,
27    #[builder(into)]
28    pub(crate) token_url: String,
29    #[builder(into)]
30    issuer: String,
31    #[builder(into)]
32    pub(crate) jwks_url: String,
33    pub(crate) tokens: T,
34    token_duration: Duration,
35    #[builder(default = Duration::days(30))]
36    refresh_token_duration: Duration,
37    store: EdrRepoRef,
38}
39
40impl<T: TokenManager> EdrManager<T> {
41    pub async fn create_edr(&self, req: &Transfer) -> Result<Edr, EdrError> {
42        let token_id: TokenId = Uuid::new_v4().into();
43        let refresh_token_id: RefreshTokenId = Uuid::new_v4().into();
44
45        let data_address = DataAddress::builder()
46            .endpoint_type(IDSA_NAMESPACE.to_iri("HTTP"))
47            .endpoint_properties(self.endpoint_properties(token_id, refresh_token_id, req)?)
48            .build();
49
50        Ok(Edr::builder()
51            .token_id(token_id)
52            .refresh_token_id(refresh_token_id)
53            .data_address(data_address)
54            .build())
55    }
56
57    pub async fn get_by_transfer_id(&self, transfer_id: &str) -> anyhow::Result<Option<EdrEntry>> {
58        self.store.fetch_by_id(transfer_id).await
59    }
60
61    pub async fn save(&self, edr: EdrEntry) -> anyhow::Result<()> {
62        self.store.save(edr).await
63    }
64
65    pub async fn delete(&self, transfer_id: &str) -> anyhow::Result<()> {
66        self.store.delete(transfer_id).await
67    }
68
69    pub async fn refresh_token(&self, req: TokenRequest) -> Result<TokenResponse, EdrError> {
70        let token_id: TokenId = Uuid::new_v4().into();
71        let refresh_token_id: RefreshTokenId = Uuid::new_v4().into();
72
73        let claims = self.tokens.validate::<EdrClaims>(&req.refresh_token)?;
74
75        self.issue_token(
76            token_id,
77            refresh_token_id,
78            &claims.claims.sub,
79            &claims.claims.transfer_id,
80        )
81    }
82
83    fn endpoint_properties(
84        &self,
85        token_id: TokenId,
86        refresh_token_id: RefreshTokenId,
87        transfer: &Transfer,
88    ) -> Result<Vec<EndpointProperty>, EdrError> {
89        let token_response = self.issue_token(
90            token_id,
91            refresh_token_id,
92            &transfer.participant_id,
93            &transfer.id,
94        )?;
95        Ok(vec![
96            EndpointProperty::builder()
97                .name(EDC_NAMESPACE.to_iri("endpoint"))
98                .value(self.proxy_url.clone())
99                .build(),
100            EndpointProperty::builder()
101                .name(EDC_NAMESPACE.to_iri("access_token"))
102                .value(token_response.access_token)
103                .build(),
104            EndpointProperty::builder()
105                .name(EDC_NAMESPACE.to_iri("token_type"))
106                .value("Bearer")
107                .build(),
108            EndpointProperty::builder()
109                .name(EDC_NAMESPACE.to_iri("refresh_token"))
110                .value(token_response.refresh_token)
111                .build(),
112            EndpointProperty::builder()
113                .name(EDC_NAMESPACE.to_iri("refresh_endpoint"))
114                .value(self.token_url.clone())
115                .build(),
116            EndpointProperty::builder()
117                .name(EDC_NAMESPACE.to_iri("expires_in"))
118                .value(token_response.expires_in)
119                .build(),
120            EndpointProperty::builder()
121                .name(EDC_NAMESPACE.to_iri("jwks_url"))
122                .value(self.jwks_url.clone())
123                .build(),
124        ])
125    }
126
127    fn issue_access_token(
128        &self,
129        id: TokenId,
130        participant_id: &str,
131        process_id: &str,
132    ) -> Result<String, EdrError> {
133        self.issue_generic_token(id.into(), participant_id, process_id, self.token_duration)
134    }
135
136    fn issue_generic_token(
137        &self,
138        jti: Uuid,
139        participant_id: &str,
140        process_id: &str,
141        duration: Duration,
142    ) -> Result<String, EdrError> {
143        let now = Utc::now();
144        let exp = now
145            .checked_add_signed(duration)
146            .ok_or_else(|| anyhow::anyhow!("Error adding {}", self.token_duration))
147            .map_err(EdrError::Generic)?;
148
149        let claims = EdrClaims::builder()
150            .jti(jti)
151            .iss(self.issuer.clone())
152            .aud(self.proxy_url.clone())
153            .sub(participant_id.to_string())
154            .exp(exp.timestamp())
155            .iat(now.timestamp())
156            .transfer_id(process_id.to_string())
157            .build();
158
159        self.tokens.issue(&claims).map(Ok)?
160    }
161
162    pub(crate) fn issue_token(
163        &self,
164        token_id: TokenId,
165        refresh_token_id: RefreshTokenId,
166        participant_id: &str,
167        process_id: &str,
168    ) -> Result<TokenResponse, EdrError> {
169        let access_token = self.issue_access_token(token_id, participant_id, process_id)?;
170        let refresh_token =
171            self.issue_refresh_token(refresh_token_id, participant_id, process_id)?;
172
173        Ok(TokenResponse {
174            access_token,
175            refresh_token,
176            expires_in: self.token_duration.num_seconds().to_string(),
177        })
178    }
179
180    fn issue_refresh_token(
181        &self,
182        id: RefreshTokenId,
183        participant_id: &str,
184        process_id: &str,
185    ) -> Result<String, EdrError> {
186        self.issue_generic_token(
187            id.into(),
188            participant_id,
189            process_id,
190            self.refresh_token_duration,
191        )
192    }
193}
194
195#[derive(Error, Debug)]
196pub enum EdrError {
197    #[error("Generic error")]
198    Generic(anyhow::Error),
199    #[error(transparent)]
200    Token(#[from] TokenError),
201}
202
203#[cfg(test)]
204mod tests {
205
206    use super::*;
207    use crate::{db::edr::MockEdrRepo, service::token::MockTokenManager};
208    use chrono::Duration;
209    use edc_dataplane_core::core::model::transfer::TransferStatus;
210    use jsonwebtoken::errors::ErrorKind;
211
212    #[tokio::test]
213    async fn test_create_edr() {
214        let mut token_manager = MockTokenManager::new();
215        let store = MockEdrRepo::new();
216
217        token_manager
218            .expect_issue::<EdrClaims>()
219            .withf(|claims| {
220                claims.iss == "issuer".to_string()
221                    && claims.aud == "http://localhost:8080/public".to_string()
222                    && claims.sub == "participant_id".to_string()
223                    && claims.transfer_id == "process_id".to_string()
224            })
225            .returning(|_: &EdrClaims| Ok("token".to_string()));
226
227        let edr_manager = EdrManager::builder()
228            .proxy_url("http://localhost:8080/public")
229            .issuer("issuer")
230            .tokens(token_manager)
231            .token_duration(Duration::hours(1))
232            .token_url("http://localhost:8080/token")
233            .jwks_url("http://localhost:8080/.well-known/jwks.json")
234            .store(EdrRepoRef::of(store))
235            .build();
236
237        let req = create_transfer();
238
239        let edr = edr_manager.create_edr(&req).await.unwrap();
240
241        assert_eq!(
242            edr.data_address.endpoint_type,
243            IDSA_NAMESPACE.to_iri("HTTP")
244        );
245        assert_eq!(edr.data_address.endpoint_properties.len(), 7);
246
247        assert_eq!(
248            edr.data_address
249                .get_property(&EDC_NAMESPACE.to_iri("access_token")),
250            Some("token")
251        );
252
253        assert_eq!(
254            edr.data_address
255                .get_property(&EDC_NAMESPACE.to_iri("refresh_token")),
256            Some("token")
257        );
258
259        assert_eq!(
260            edr.data_address
261                .get_property(&EDC_NAMESPACE.to_iri("endpoint")),
262            Some(edr_manager.proxy_url.as_ref())
263        );
264
265        assert_eq!(
266            edr.data_address
267                .get_property(&EDC_NAMESPACE.to_iri("jwks_url")),
268            Some(edr_manager.jwks_url.as_ref())
269        );
270
271        assert_eq!(
272            edr.data_address
273                .get_property(&EDC_NAMESPACE.to_iri("expires_in")),
274            Some("3600")
275        );
276    }
277
278    #[tokio::test]
279    async fn test_create_edr_failure() {
280        let mut token_manager = MockTokenManager::new();
281        let store = MockEdrRepo::new();
282
283        token_manager
284            .expect_issue::<EdrClaims>()
285            .returning(|_: &EdrClaims| Err(TokenError::Encode(ErrorKind::InvalidKeyFormat.into())));
286
287        let edr_manager = EdrManager::builder()
288            .proxy_url("http://localhost:8080/public".to_string())
289            .issuer("issuer".to_string())
290            .tokens(token_manager)
291            .token_duration(Duration::days(1))
292            .token_url("http://localhost:8080/token")
293            .jwks_url("http://localhost:8080/.well-known/jwks.json")
294            .store(EdrRepoRef::of(store))
295            .build();
296
297        let req = create_transfer();
298
299        let result = edr_manager.create_edr(&req).await;
300
301        if let Err(EdrError::Token(TokenError::Encode(err))) = result {
302            assert_eq!(err.kind(), &ErrorKind::InvalidKeyFormat);
303        } else {
304            panic!("Wrong type")
305        }
306    }
307
308    fn create_transfer() -> Transfer {
309        Transfer::builder()
310            .participant_id("participant_id".to_string())
311            .id("process_id".to_string())
312            .source(
313                DataAddress::builder()
314                    .endpoint_type("MyType".to_string())
315                    .endpoint_properties(vec![])
316                    .build(),
317            )
318            .status(TransferStatus::Started)
319            .build()
320    }
321}