edc_dataplane_core/core/service/
transfer.rs1use async_trait::async_trait;
2use miwa::derive::interface;
3
4use miwa::derive::Injectable;
5#[cfg(test)]
6use mockall::{automock, predicate::*};
7use tracing::debug;
8
9use crate::{
10 core::{
11 db::transfer::TransferRepoRef,
12 model::transfer::{Transfer, TransferStatus},
13 },
14 signaling::{DataAddress, DataFlowResponseMessage, DataFlowStartMessage},
15};
16
17#[derive(Clone, Injectable)]
18pub struct TransferService {
19 manager: TransferManagerRef,
20 db: TransferRepoRef,
21}
22
23impl TransferService {
24 pub fn new(manager: TransferManagerRef, db: TransferRepoRef) -> Self {
25 Self { manager, db }
26 }
27
28 pub async fn start(
29 &self,
30 req: DataFlowStartMessage,
31 ) -> anyhow::Result<DataFlowResponseMessage> {
32 let transfer = Transfer::builder()
33 .id(req.process_id.clone())
34 .participant_id(req.participant_id.clone())
35 .source(req.source_data_address)
36 .status(TransferStatus::Started)
37 .build();
38
39 if self.manager.can_handle(&transfer).await? {
40 let address = self.manager.handle_start(&transfer).await?;
41 self.db.save(transfer).await?;
42 Ok(DataFlowResponseMessage::new(address))
43 } else {
44 Err(anyhow::anyhow!("Transfer not supported"))
45 }
46 }
47
48 pub async fn get(&self, id: &str) -> anyhow::Result<Option<Transfer>> {
49 self.db.fetch_by_id(id).await
50 }
51
52 pub async fn suspend(&self, id: String) -> anyhow::Result<()> {
53 debug!("Suspending transfer with id {}", id);
54
55 self.db.change_status(id, TransferStatus::Suspended).await
56 }
57
58 pub async fn terminate(&self, id: String, reason: Option<String>) -> anyhow::Result<()> {
59 debug!(
60 "Terminating transfer with id {} with reason: {:?}",
61 id, reason
62 );
63 self.db.delete(&id).await
64 }
65}
66
67#[async_trait]
68#[interface]
69#[cfg_attr(test, automock)]
70pub trait TransferManager {
71 async fn can_handle(&self, transfer: &Transfer) -> anyhow::Result<bool>;
72 async fn handle_start(&self, transfer: &Transfer) -> anyhow::Result<Option<DataAddress>>;
73 async fn handle_suspend(&self, id: &str) -> anyhow::Result<()>;
74 async fn handle_terminate(&self, id: &str) -> anyhow::Result<()>;
75}
76
77#[cfg(test)]
78mod tests {
79 use futures::FutureExt;
80 use std::collections::HashMap;
81 use uuid::Uuid;
82
83 use crate::{
84 core::{
85 db::transfer::{MockTransferRepo, TransferRepoRef},
86 model::namespace::{EDC_NAMESPACE, IDSA_NAMESPACE},
87 },
88 signaling::{DataAddress, DataFlowStartMessage, EndpointProperty, FlowType},
89 };
90
91 use super::{MockTransferManager, TransferManagerRef, TransferService};
92
93 #[tokio::test]
94 async fn start_transfer() {
95 let mut transfer_manager = MockTransferManager::new();
98 let mut store = MockTransferRepo::new();
99
100 transfer_manager
101 .expect_can_handle()
102 .returning(|_| futures::future::ok(true).boxed());
103
104 transfer_manager
105 .expect_handle_start()
106 .returning(|_| futures::future::ok(Some(create_data_address())).boxed());
107
108 store
109 .expect_save()
110 .returning(|_| Box::pin(async { Ok(()) }));
111
112 let manager = create_transfer_manager(transfer_manager, store);
113
114 let req = create_req();
115
116 let data_address = manager
117 .start(req)
118 .await
119 .unwrap()
120 .data_address
121 .expect("Data address is missing");
122
123 assert_eq!(data_address.endpoint_type, IDSA_NAMESPACE.to_iri("HTTP"));
124 assert_eq!(data_address.endpoint_properties.len(), 0);
125 }
126
127 #[tokio::test]
128 async fn start_transfer_fails_when_store_fails() {
129 let mut transfer_manager = MockTransferManager::new();
130 let mut store = MockTransferRepo::new();
131
132 transfer_manager
133 .expect_can_handle()
134 .returning(|_| futures::future::ok(true).boxed());
135
136 transfer_manager
137 .expect_handle_start()
138 .returning(|_| futures::future::ok(Some(create_data_address())).boxed());
139
140 store
141 .expect_save()
142 .returning(|_| Box::pin(async { Err(anyhow::anyhow!("Failed to save")) }));
143
144 let manager = create_transfer_manager(transfer_manager, store);
145
146 let req = create_req();
147
148 let result = manager.start(req).await.unwrap_err();
149
150 assert_eq!(result.to_string(), "Failed to save");
151 }
152
153 #[tokio::test]
154 async fn start_transfer_fails_when_manager_fails() {
155 let mut transfer_manager = MockTransferManager::new();
156 let store = MockTransferRepo::new();
157
158 transfer_manager
159 .expect_can_handle()
160 .returning(|_| futures::future::ok(true).boxed());
161
162 transfer_manager
163 .expect_handle_start()
164 .returning(|_| futures::future::err(anyhow::anyhow!("Failed to handle start")).boxed());
165
166 let manager = create_transfer_manager(transfer_manager, store);
167
168 let req = create_req();
169
170 let result = manager.start(req).await.unwrap_err();
171
172 assert_eq!(result.to_string(), "Failed to handle start");
173 }
174
175 fn create_transfer_manager(
176 mock: MockTransferManager,
177 mock_store: MockTransferRepo,
178 ) -> TransferService {
179 let manager = TransferManagerRef::of(mock);
189 let store = TransferRepoRef::of(mock_store);
190
191 TransferService::new(manager, store)
192 }
193
194 fn create_data_address() -> DataAddress {
195 DataAddress::builder()
196 .endpoint_type(IDSA_NAMESPACE.to_iri("HTTP"))
197 .endpoint_properties(vec![])
198 .build()
199 }
200
201 fn create_req() -> DataFlowStartMessage {
202 DataFlowStartMessage::builder()
203 .participant_id("participant_id".to_string())
204 .process_id("process_id".to_string())
205 .source_data_address(
206 DataAddress::builder()
207 .endpoint_type("HttpData".to_string())
208 .endpoint_properties(vec![EndpointProperty::builder()
209 .name(EDC_NAMESPACE.to_iri("baseUrl"))
210 .value("http://localhost:8080")
211 .build()])
212 .build(),
213 )
214 .properties(HashMap::new())
215 .flow_type(FlowType::Pull)
216 .dataset_id(Uuid::new_v4().to_string())
217 .agreement_id(Uuid::new_v4().to_string())
218 .build()
219 }
220}