edc_dataplane_proxy/
manager.rs

1use async_trait::async_trait;
2use edc_dataplane_core::{
3    core::{
4        model::transfer::{types::TransferKind, Transfer},
5        service::transfer::TransferManager,
6    },
7    signaling::DataAddress,
8};
9
10use crate::{
11    db::edr::EdrRepoRef,
12    model::edr::EdrEntry,
13    service::{edr::EdrManager, token::TokenManager},
14};
15
16pub struct TransferProxyManager<T: TokenManager> {
17    edrs: EdrManager<T>,
18    tokens: EdrRepoRef,
19}
20
21impl<T: TokenManager> TransferProxyManager<T> {
22    pub fn new(edrs: EdrManager<T>, tokens: EdrRepoRef) -> Self {
23        Self { edrs, tokens }
24    }
25}
26
27#[async_trait]
28impl<T: TokenManager + Send + Sync + 'static> TransferManager for TransferProxyManager<T> {
29    async fn can_handle(&self, transfer: &Transfer) -> anyhow::Result<bool> {
30        let _ = TransferKind::try_from(&transfer.source.0)?;
31
32        Ok(true)
33    }
34
35    async fn handle_start(&self, transfer: &Transfer) -> anyhow::Result<Option<DataAddress>> {
36        let edr = self.edrs.create_edr(transfer).await?;
37
38        let entry = EdrEntry::builder()
39            .transfer_id(transfer.id.clone())
40            .refresh_token_id(edr.refresh_token_id)
41            .token_id(edr.token_id)
42            .build();
43
44        self.tokens.save(entry).await?;
45
46        Ok(Some(edr.data_address))
47    }
48
49    async fn handle_suspend(&self, _id: &str) -> anyhow::Result<()> {
50        // todo handle suspend
51        Ok(())
52    }
53    async fn handle_terminate(&self, id: &str) -> anyhow::Result<()> {
54        self.edrs.delete(id).await
55    }
56}