Skip to main content

fp_runtime/asupersync/
transport.rs

1use std::{
2    collections::BTreeMap,
3    sync::{Arc, Mutex},
4};
5
6use serde::{Deserialize, Serialize};
7
8use crate::asupersync::{
9    codec::EncodedArtifact,
10    config::{AsupersyncConfig, CapabilitySet, CxCapability},
11    error::AsupersyncError,
12    validate_capability_gate,
13};
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
16#[serde(rename_all = "snake_case")]
17pub enum TransferStatus {
18    Completed,
19    RetryableFailure,
20    PermanentFailure,
21}
22
23#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
24pub struct TransferReport {
25    pub artifact_id: String,
26    pub bytes_transferred: usize,
27    pub status: TransferStatus,
28    pub detail: String,
29}
30
31pub trait TransportLayer {
32    fn send(
33        &self,
34        artifact: EncodedArtifact,
35        config: &AsupersyncConfig,
36    ) -> Result<TransferReport, AsupersyncError>;
37
38    fn receive(
39        &self,
40        artifact_id: &str,
41        config: &AsupersyncConfig,
42    ) -> Result<EncodedArtifact, AsupersyncError>;
43
44    fn required_capabilities(&self) -> CapabilitySet {
45        CapabilitySet::for_capability(CxCapability::Io)
46            .union(CapabilitySet::for_capability(CxCapability::Remote))
47    }
48}
49
50#[derive(Debug, Clone, Default)]
51pub struct InMemoryTransport {
52    storage: Arc<Mutex<BTreeMap<String, EncodedArtifact>>>,
53}
54
55impl InMemoryTransport {
56    #[must_use]
57    pub fn new() -> Self {
58        Self::default()
59    }
60}
61
62impl TransportLayer for InMemoryTransport {
63    fn send(
64        &self,
65        artifact: EncodedArtifact,
66        config: &AsupersyncConfig,
67    ) -> Result<TransferReport, AsupersyncError> {
68        validate_capability_gate(config, self.required_capabilities())?;
69
70        let mut guard = self.storage.lock().map_err(|_| {
71            AsupersyncError::Transport("in-memory transport lock poisoned".to_string())
72        })?;
73        let bytes_transferred = artifact.encoded_bytes.len();
74        let artifact_id = artifact.artifact_id.clone();
75        guard.insert(artifact_id.clone(), artifact);
76
77        Ok(TransferReport {
78            artifact_id,
79            bytes_transferred,
80            status: TransferStatus::Completed,
81            detail: "stored in in-memory transport".to_string(),
82        })
83    }
84
85    fn receive(
86        &self,
87        artifact_id: &str,
88        config: &AsupersyncConfig,
89    ) -> Result<EncodedArtifact, AsupersyncError> {
90        validate_capability_gate(config, self.required_capabilities())?;
91
92        let guard = self.storage.lock().map_err(|_| {
93            AsupersyncError::Transport("in-memory transport lock poisoned".to_string())
94        })?;
95        guard
96            .get(artifact_id)
97            .cloned()
98            .ok_or_else(|| AsupersyncError::ArtifactNotFound(artifact_id.to_string()))
99    }
100}