edc_dataplane_core/core/service/
transfer.rs

1use async_trait::async_trait;
2use miwa::derive::interface;
3
4use miwa::derive::Injectable;
5#[cfg(test)]
6use mockall::{automock, predicate::*};
7use tracing::debug;
8
9use crate::{
10    core::{
11        db::transfer::TransferRepoRef,
12        model::transfer::{Transfer, TransferStatus},
13    },
14    signaling::{DataAddress, DataFlowResponseMessage, DataFlowStartMessage},
15};
16
17#[derive(Clone, Injectable)]
18pub struct TransferService {
19    manager: TransferManagerRef,
20    db: TransferRepoRef,
21}
22
23impl TransferService {
24    pub fn new(manager: TransferManagerRef, db: TransferRepoRef) -> Self {
25        Self { manager, db }
26    }
27
28    pub async fn start(
29        &self,
30        req: DataFlowStartMessage,
31    ) -> anyhow::Result<DataFlowResponseMessage> {
32        let transfer = Transfer::builder()
33            .id(req.process_id.clone())
34            .participant_id(req.participant_id.clone())
35            .source(req.source_data_address)
36            .status(TransferStatus::Started)
37            .build();
38
39        if self.manager.can_handle(&transfer).await? {
40            let address = self.manager.handle_start(&transfer).await?;
41            self.db.save(transfer).await?;
42            Ok(DataFlowResponseMessage::new(address))
43        } else {
44            Err(anyhow::anyhow!("Transfer not supported"))
45        }
46    }
47
48    pub async fn get(&self, id: &str) -> anyhow::Result<Option<Transfer>> {
49        self.db.fetch_by_id(id).await
50    }
51
52    pub async fn suspend(&self, id: String) -> anyhow::Result<()> {
53        debug!("Suspending transfer with id {}", id);
54
55        self.db.change_status(id, TransferStatus::Suspended).await
56    }
57
58    pub async fn terminate(&self, id: String, reason: Option<String>) -> anyhow::Result<()> {
59        debug!(
60            "Terminating transfer with id {} with reason: {:?}",
61            id, reason
62        );
63        self.db.delete(&id).await
64    }
65}
66
67#[async_trait]
68#[interface]
69#[cfg_attr(test, automock)]
70pub trait TransferManager {
71    async fn can_handle(&self, transfer: &Transfer) -> anyhow::Result<bool>;
72    async fn handle_start(&self, transfer: &Transfer) -> anyhow::Result<Option<DataAddress>>;
73    async fn handle_suspend(&self, id: &str) -> anyhow::Result<()>;
74    async fn handle_terminate(&self, id: &str) -> anyhow::Result<()>;
75}
76
77#[cfg(test)]
78mod tests {
79    use futures::FutureExt;
80    use std::collections::HashMap;
81    use uuid::Uuid;
82
83    use crate::{
84        core::{
85            db::transfer::{MockTransferRepo, TransferRepoRef},
86            model::namespace::{EDC_NAMESPACE, IDSA_NAMESPACE},
87        },
88        signaling::{DataAddress, DataFlowStartMessage, EndpointProperty, FlowType},
89    };
90
91    use super::{MockTransferManager, TransferManagerRef, TransferService};
92
93    #[tokio::test]
94    async fn start_transfer() {
95        // let mut token_manager = MockTokenManager::new();
96        //
97        let mut transfer_manager = MockTransferManager::new();
98        let mut store = MockTransferRepo::new();
99
100        transfer_manager
101            .expect_can_handle()
102            .returning(|_| futures::future::ok(true).boxed());
103
104        transfer_manager
105            .expect_handle_start()
106            .returning(|_| futures::future::ok(Some(create_data_address())).boxed());
107
108        store
109            .expect_save()
110            .returning(|_| Box::pin(async { Ok(()) }));
111
112        let manager = create_transfer_manager(transfer_manager, store);
113
114        let req = create_req();
115
116        let data_address = manager
117            .start(req)
118            .await
119            .unwrap()
120            .data_address
121            .expect("Data address is missing");
122
123        assert_eq!(data_address.endpoint_type, IDSA_NAMESPACE.to_iri("HTTP"));
124        assert_eq!(data_address.endpoint_properties.len(), 0);
125    }
126
127    #[tokio::test]
128    async fn start_transfer_fails_when_store_fails() {
129        let mut transfer_manager = MockTransferManager::new();
130        let mut store = MockTransferRepo::new();
131
132        transfer_manager
133            .expect_can_handle()
134            .returning(|_| futures::future::ok(true).boxed());
135
136        transfer_manager
137            .expect_handle_start()
138            .returning(|_| futures::future::ok(Some(create_data_address())).boxed());
139
140        store
141            .expect_save()
142            .returning(|_| Box::pin(async { Err(anyhow::anyhow!("Failed to save")) }));
143
144        let manager = create_transfer_manager(transfer_manager, store);
145
146        let req = create_req();
147
148        let result = manager.start(req).await.unwrap_err();
149
150        assert_eq!(result.to_string(), "Failed to save");
151    }
152
153    #[tokio::test]
154    async fn start_transfer_fails_when_manager_fails() {
155        let mut transfer_manager = MockTransferManager::new();
156        let store = MockTransferRepo::new();
157
158        transfer_manager
159            .expect_can_handle()
160            .returning(|_| futures::future::ok(true).boxed());
161
162        transfer_manager
163            .expect_handle_start()
164            .returning(|_| futures::future::err(anyhow::anyhow!("Failed to handle start")).boxed());
165
166        let manager = create_transfer_manager(transfer_manager, store);
167
168        let req = create_req();
169
170        let result = manager.start(req).await.unwrap_err();
171
172        assert_eq!(result.to_string(), "Failed to handle start");
173    }
174
175    fn create_transfer_manager(
176        mock: MockTransferManager,
177        mock_store: MockTransferRepo,
178    ) -> TransferService {
179        // let edr = EdrManager::builder()
180        //     .tokens(mock)
181        //     .proxy_url("http://localhost:8080/public")
182        //     .issuer("issuer")
183        //     .token_duration(Duration::days(1))
184        //     .token_url("http://localhost:8080/token")
185        //     .jwks_url("http://localhost:8080/.well-known/jwks.json")
186        //     .build();
187
188        let manager = TransferManagerRef::of(mock);
189        let store = TransferRepoRef::of(mock_store);
190
191        TransferService::new(manager, store)
192    }
193
194    fn create_data_address() -> DataAddress {
195        DataAddress::builder()
196            .endpoint_type(IDSA_NAMESPACE.to_iri("HTTP"))
197            .endpoint_properties(vec![])
198            .build()
199    }
200
201    fn create_req() -> DataFlowStartMessage {
202        DataFlowStartMessage::builder()
203            .participant_id("participant_id".to_string())
204            .process_id("process_id".to_string())
205            .source_data_address(
206                DataAddress::builder()
207                    .endpoint_type("HttpData".to_string())
208                    .endpoint_properties(vec![EndpointProperty::builder()
209                        .name(EDC_NAMESPACE.to_iri("baseUrl"))
210                        .value("http://localhost:8080")
211                        .build()])
212                    .build(),
213            )
214            .properties(HashMap::new())
215            .flow_type(FlowType::Pull)
216            .dataset_id(Uuid::new_v4().to_string())
217            .agreement_id(Uuid::new_v4().to_string())
218            .build()
219    }
220}