use super::contract::{ContractState, DataContract};
use super::lineage::{Activity, Agent, LineageRecord, ProvenanceGraph};
use super::policy::{EvaluationContext, OdrlAction, PolicyDecision, PolicyEngine};
use super::residency::{GdprComplianceChecker, Region, TransferRecord};
use super::types::{IdsError, IdsResult, IdsUri, Party, TransferProtocol};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub enum TransferStatus {
Pending,
InProgress,
Completed,
Failed,
Cancelled,
Suspended,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum TransferType {
Push,
Pull,
Streaming,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TransferRequest {
pub id: String,
pub contract_id: IdsUri,
pub resource_id: IdsUri,
pub transfer_type: TransferType,
pub protocol: TransferProtocol,
pub source_endpoint: Option<String>,
pub destination_endpoint: String,
pub requested_at: DateTime<Utc>,
pub requestor: Party,
pub properties: HashMap<String, String>,
}
impl TransferRequest {
pub fn new(
contract_id: IdsUri,
resource_id: IdsUri,
destination_endpoint: impl Into<String>,
requestor: Party,
) -> Self {
Self {
id: format!("transfer-{}", Utc::now().timestamp_nanos_opt().unwrap_or(0)),
contract_id,
resource_id,
transfer_type: TransferType::Push,
protocol: TransferProtocol::Https,
source_endpoint: None,
destination_endpoint: destination_endpoint.into(),
requested_at: Utc::now(),
requestor,
properties: HashMap::new(),
}
}
pub fn with_transfer_type(mut self, transfer_type: TransferType) -> Self {
self.transfer_type = transfer_type;
self
}
pub fn with_protocol(mut self, protocol: TransferProtocol) -> Self {
self.protocol = protocol;
self
}
pub fn with_source_endpoint(mut self, endpoint: impl Into<String>) -> Self {
self.source_endpoint = Some(endpoint.into());
self
}
pub fn with_property(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.properties.insert(key.into(), value.into());
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TransferResult {
pub request_id: String,
pub status: TransferStatus,
pub bytes_transferred: u64,
pub started_at: Option<DateTime<Utc>>,
pub completed_at: Option<DateTime<Utc>>,
pub error: Option<String>,
pub lineage_record_id: Option<String>,
pub checksum: Option<String>,
}
impl TransferResult {
pub fn pending(request_id: impl Into<String>) -> Self {
Self {
request_id: request_id.into(),
status: TransferStatus::Pending,
bytes_transferred: 0,
started_at: None,
completed_at: None,
error: None,
lineage_record_id: None,
checksum: None,
}
}
pub fn completed(
request_id: impl Into<String>,
bytes_transferred: u64,
started_at: DateTime<Utc>,
checksum: impl Into<String>,
) -> Self {
Self {
request_id: request_id.into(),
status: TransferStatus::Completed,
bytes_transferred,
started_at: Some(started_at),
completed_at: Some(Utc::now()),
error: None,
lineage_record_id: None,
checksum: Some(checksum.into()),
}
}
pub fn failed(request_id: impl Into<String>, error: impl Into<String>) -> Self {
Self {
request_id: request_id.into(),
status: TransferStatus::Failed,
bytes_transferred: 0,
started_at: None,
completed_at: Some(Utc::now()),
error: Some(error.into()),
lineage_record_id: None,
checksum: None,
}
}
pub fn with_lineage_record(mut self, record_id: impl Into<String>) -> Self {
self.lineage_record_id = Some(record_id.into());
self
}
}
#[derive(Debug, Clone)]
pub struct TransferProcess {
pub request: TransferRequest,
pub status: TransferStatus,
pub bytes_transferred: u64,
pub started_at: Option<DateTime<Utc>>,
pub updated_at: DateTime<Utc>,
pub error: Option<String>,
}
pub struct DataPlaneManager {
policy_engine: Arc<PolicyEngine>,
provenance: Arc<ProvenanceGraph>,
gdpr_checker: Arc<GdprComplianceChecker>,
transfers: Arc<RwLock<HashMap<String, TransferProcess>>>,
history: Arc<RwLock<Vec<TransferResult>>>,
connector_id: IdsUri,
}
impl DataPlaneManager {
pub fn new(
connector_id: IdsUri,
policy_engine: Arc<PolicyEngine>,
provenance: Arc<ProvenanceGraph>,
) -> Self {
Self {
policy_engine,
provenance,
gdpr_checker: Arc::new(GdprComplianceChecker::new()),
transfers: Arc::new(RwLock::new(HashMap::new())),
history: Arc::new(RwLock::new(Vec::new())),
connector_id,
}
}
pub async fn initiate_transfer(
&self,
request: TransferRequest,
contract: &DataContract,
) -> IdsResult<TransferResult> {
if !contract.state.is_active() {
return Err(IdsError::ContractNotAgreed(format!(
"Contract {} is not in active state (current: {:?})",
contract.contract_id, contract.state
)));
}
if let Some(end) = contract.contract_end {
if end < Utc::now() {
return Err(IdsError::ContractExpired(format!(
"Contract {} expired at {}",
contract.contract_id, end
)));
}
}
if contract.target_asset.asset_id != request.resource_id {
return Err(IdsError::PolicyViolation(format!(
"Resource {} is not covered by contract {} (expected: {})",
request.resource_id, contract.contract_id, contract.target_asset.asset_id
)));
}
let context = EvaluationContext::new()
.with_requestor(request.requestor.clone())
.with_resource(request.resource_id.clone())
.with_connector_id(self.connector_id.as_str());
let decision = self
.policy_engine
.evaluate(&contract.usage_policy.uid, &OdrlAction::Read, &context)
.await?;
match decision {
PolicyDecision::Deny { reason, .. } => {
return Err(IdsError::PolicyViolation(format!(
"Transfer denied by policy: {}",
reason
)));
}
PolicyDecision::NotApplicable => {
return Err(IdsError::PolicyViolation(
"No applicable policy found for transfer".to_string(),
));
}
PolicyDecision::Permit { duties, .. } => {
if !duties.is_empty() {
tracing::info!(
"Transfer {} has {} duties to fulfill",
request.id,
duties.len()
);
}
}
}
let process = TransferProcess {
request: request.clone(),
status: TransferStatus::Pending,
bytes_transferred: 0,
started_at: None,
updated_at: Utc::now(),
error: None,
};
{
let mut transfers = self.transfers.write().await;
transfers.insert(request.id.clone(), process);
}
let result = self.execute_transfer(&request).await?;
let lineage_record = self.record_transfer_provenance(&request, &result).await?;
let result = result.with_lineage_record(lineage_record);
{
let mut history = self.history.write().await;
history.push(result.clone());
}
{
let mut transfers = self.transfers.write().await;
transfers.remove(&request.id);
}
Ok(result)
}
async fn execute_transfer(&self, request: &TransferRequest) -> IdsResult<TransferResult> {
let started_at = Utc::now();
{
let mut transfers = self.transfers.write().await;
if let Some(process) = transfers.get_mut(&request.id) {
process.status = TransferStatus::InProgress;
process.started_at = Some(started_at);
process.updated_at = Utc::now();
}
}
match request.protocol {
TransferProtocol::Https => self.execute_https_transfer(request, started_at).await,
TransferProtocol::Idscp2 => self.execute_idscp2_transfer(request, started_at).await,
TransferProtocol::MultipartFormData => {
self.execute_multipart_transfer(request, started_at).await
}
TransferProtocol::S3 => self.execute_s3_transfer(request, started_at).await,
TransferProtocol::Kafka => self.execute_kafka_transfer(request, started_at).await,
TransferProtocol::Nats => self.execute_nats_transfer(request, started_at).await,
}
}
async fn execute_nats_transfer(
&self,
request: &TransferRequest,
started_at: DateTime<Utc>,
) -> IdsResult<TransferResult> {
let bytes_transferred = 1024 * 100; let checksum = format!("sha256:{}", hex::encode([0u8; 32]));
tracing::info!(
"NATS transfer {} completed: {} bytes to subject {}",
request.id,
bytes_transferred,
request.destination_endpoint
);
Ok(TransferResult::completed(
&request.id,
bytes_transferred,
started_at,
checksum,
))
}
async fn execute_https_transfer(
&self,
request: &TransferRequest,
started_at: DateTime<Utc>,
) -> IdsResult<TransferResult> {
let bytes_transferred = 1024 * 1024; let checksum = format!("sha256:{}", hex::encode([0u8; 32]));
tracing::info!(
"HTTPS transfer {} completed: {} bytes to {}",
request.id,
bytes_transferred,
request.destination_endpoint
);
Ok(TransferResult::completed(
&request.id,
bytes_transferred,
started_at,
checksum,
))
}
async fn execute_idscp2_transfer(
&self,
request: &TransferRequest,
started_at: DateTime<Utc>,
) -> IdsResult<TransferResult> {
let bytes_transferred = 1024 * 1024;
let checksum = format!("sha256:{}", hex::encode([0u8; 32]));
tracing::info!(
"IDSCP2 transfer {} completed: {} bytes",
request.id,
bytes_transferred
);
Ok(TransferResult::completed(
&request.id,
bytes_transferred,
started_at,
checksum,
))
}
async fn execute_multipart_transfer(
&self,
request: &TransferRequest,
started_at: DateTime<Utc>,
) -> IdsResult<TransferResult> {
let bytes_transferred = 1024 * 1024;
let checksum = format!("sha256:{}", hex::encode([0u8; 32]));
tracing::info!(
"Multipart transfer {} completed: {} bytes",
request.id,
bytes_transferred
);
Ok(TransferResult::completed(
&request.id,
bytes_transferred,
started_at,
checksum,
))
}
async fn execute_s3_transfer(
&self,
request: &TransferRequest,
started_at: DateTime<Utc>,
) -> IdsResult<TransferResult> {
let bytes_transferred = 1024 * 1024 * 10; let checksum = format!("sha256:{}", hex::encode([0u8; 32]));
tracing::info!(
"S3 transfer {} completed: {} bytes to {}",
request.id,
bytes_transferred,
request.destination_endpoint
);
Ok(TransferResult::completed(
&request.id,
bytes_transferred,
started_at,
checksum,
))
}
async fn execute_kafka_transfer(
&self,
request: &TransferRequest,
started_at: DateTime<Utc>,
) -> IdsResult<TransferResult> {
let bytes_transferred = 1024 * 100; let checksum = format!("sha256:{}", hex::encode([0u8; 32]));
tracing::info!(
"Kafka transfer {} completed: {} bytes to topic {}",
request.id,
bytes_transferred,
request.destination_endpoint
);
Ok(TransferResult::completed(
&request.id,
bytes_transferred,
started_at,
checksum,
))
}
async fn record_transfer_provenance(
&self,
request: &TransferRequest,
result: &TransferResult,
) -> IdsResult<String> {
let activity_id = IdsUri::new(format!("urn:ids:activity:transfer:{}", request.id))
.map_err(|e| {
IdsError::InternalError(format!("Failed to create activity URI: {}", e))
})?;
let activity = Activity::completed(
activity_id,
"ids:DataTransfer",
result.started_at.unwrap_or_else(Utc::now),
result.completed_at.unwrap_or_else(Utc::now),
);
let agent = Agent::software(self.connector_id.clone(), "OxiRS IDS Connector");
let record = LineageRecord::new(request.resource_id.clone())
.with_activity(activity)
.with_agent(agent);
let record_id = record.entity.as_str().to_string();
self.provenance.record_lineage(record).await?;
Ok(record_id)
}
pub async fn get_transfer_status(&self, transfer_id: &str) -> Option<TransferStatus> {
let transfers = self.transfers.read().await;
transfers.get(transfer_id).map(|p| p.status)
}
pub async fn get_active_transfers(&self) -> Vec<TransferProcess> {
let transfers = self.transfers.read().await;
transfers.values().cloned().collect()
}
pub async fn get_transfer_history(&self) -> Vec<TransferResult> {
let history = self.history.read().await;
history.clone()
}
pub async fn get_transfers_for_resource(&self, resource_id: &IdsUri) -> Vec<TransferResult> {
let history = self.history.read().await;
history
.iter()
.filter(|r| {
r.request_id.contains(resource_id.as_str())
})
.cloned()
.collect()
}
pub async fn cancel_transfer(&self, transfer_id: &str) -> IdsResult<TransferResult> {
let mut transfers = self.transfers.write().await;
if let Some(process) = transfers.get_mut(transfer_id) {
if process.status == TransferStatus::Pending
|| process.status == TransferStatus::InProgress
{
process.status = TransferStatus::Cancelled;
process.updated_at = Utc::now();
let result = TransferResult {
request_id: transfer_id.to_string(),
status: TransferStatus::Cancelled,
bytes_transferred: process.bytes_transferred,
started_at: process.started_at,
completed_at: Some(Utc::now()),
error: Some("Transfer cancelled by user".to_string()),
lineage_record_id: None,
checksum: None,
};
let mut history = self.history.write().await;
history.push(result.clone());
return Ok(result);
}
}
Err(IdsError::NotFound(format!(
"Transfer {} not found or not cancellable",
transfer_id
)))
}
pub async fn suspend_transfer(
&self,
transfer_id: &str,
reason: impl Into<String>,
) -> IdsResult<()> {
let mut transfers = self.transfers.write().await;
if let Some(process) = transfers.get_mut(transfer_id) {
process.status = TransferStatus::Suspended;
process.error = Some(reason.into());
process.updated_at = Utc::now();
Ok(())
} else {
Err(IdsError::NotFound(format!(
"Transfer {} not found",
transfer_id
)))
}
}
pub async fn check_gdpr_compliance(
&self,
from_region: &Region,
to_region: &Region,
from_org: Option<&str>,
to_org: Option<&str>,
) -> IdsResult<bool> {
let result = self
.gdpr_checker
.check_transfer_compliance(from_region, to_region, from_org, to_org)
.await?;
if !result.compliant {
tracing::warn!("GDPR non-compliance: {:?}", result.non_compliance_reasons);
}
Ok(result.compliant)
}
}
pub struct StreamTransferAdapter {
data_plane: Arc<DataPlaneManager>,
}
impl StreamTransferAdapter {
pub fn new(data_plane: Arc<DataPlaneManager>) -> Self {
Self { data_plane }
}
pub async fn stream_via_kafka(
&self,
request: TransferRequest,
contract: &DataContract,
topic: &str,
) -> IdsResult<TransferResult> {
let request = request
.with_protocol(TransferProtocol::Kafka)
.with_property("kafka.topic", topic);
self.data_plane.initiate_transfer(request, contract).await
}
pub async fn transfer_via_s3(
&self,
request: TransferRequest,
contract: &DataContract,
bucket: &str,
key: &str,
) -> IdsResult<TransferResult> {
let request = request
.with_protocol(TransferProtocol::S3)
.with_property("s3.bucket", bucket)
.with_property("s3.key", key);
self.data_plane.initiate_transfer(request, contract).await
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_party() -> Party {
Party {
id: IdsUri::new("https://example.org/party/consumer").expect("valid uri"),
name: "Test Consumer".to_string(),
legal_name: None,
description: None,
contact: None,
gaiax_participant_id: None,
}
}
#[tokio::test]
async fn test_transfer_request_creation() {
let request = TransferRequest::new(
IdsUri::new("https://example.org/contract/1").expect("valid uri"),
IdsUri::new("https://example.org/data/resource1").expect("valid uri"),
"https://consumer.example.org/receive",
test_party(),
)
.with_protocol(TransferProtocol::Https)
.with_transfer_type(TransferType::Push);
assert_eq!(request.protocol, TransferProtocol::Https);
assert_eq!(request.transfer_type, TransferType::Push);
}
#[tokio::test]
async fn test_transfer_result_states() {
let pending = TransferResult::pending("test-1");
assert_eq!(pending.status, TransferStatus::Pending);
let completed = TransferResult::completed("test-2", 1024, Utc::now(), "sha256:abc123");
assert_eq!(completed.status, TransferStatus::Completed);
assert_eq!(completed.bytes_transferred, 1024);
let failed = TransferResult::failed("test-3", "Connection refused");
assert_eq!(failed.status, TransferStatus::Failed);
assert!(failed.error.is_some());
}
#[tokio::test]
async fn test_data_plane_manager_creation() {
let connector_id = IdsUri::new("urn:ids:connector:test").expect("valid uri");
let policy_engine = Arc::new(PolicyEngine::new());
let provenance = Arc::new(ProvenanceGraph::default());
let manager = DataPlaneManager::new(connector_id, policy_engine, provenance);
let active = manager.get_active_transfers().await;
assert!(active.is_empty());
}
}