fp_runtime/asupersync/
transport.rs1use std::{
2 collections::BTreeMap,
3 sync::{Arc, Mutex},
4};
5
6use serde::{Deserialize, Serialize};
7
8use crate::asupersync::{
9 codec::EncodedArtifact,
10 config::{AsupersyncConfig, CapabilitySet, CxCapability},
11 error::AsupersyncError,
12 validate_capability_gate,
13};
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
16#[serde(rename_all = "snake_case")]
17pub enum TransferStatus {
18 Completed,
19 RetryableFailure,
20 PermanentFailure,
21}
22
23#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
24pub struct TransferReport {
25 pub artifact_id: String,
26 pub bytes_transferred: usize,
27 pub status: TransferStatus,
28 pub detail: String,
29}
30
31pub trait TransportLayer {
32 fn send(
33 &self,
34 artifact: EncodedArtifact,
35 config: &AsupersyncConfig,
36 ) -> Result<TransferReport, AsupersyncError>;
37
38 fn receive(
39 &self,
40 artifact_id: &str,
41 config: &AsupersyncConfig,
42 ) -> Result<EncodedArtifact, AsupersyncError>;
43
44 fn required_capabilities(&self) -> CapabilitySet {
45 CapabilitySet::for_capability(CxCapability::Io)
46 .union(CapabilitySet::for_capability(CxCapability::Remote))
47 }
48}
49
50#[derive(Debug, Clone, Default)]
51pub struct InMemoryTransport {
52 storage: Arc<Mutex<BTreeMap<String, EncodedArtifact>>>,
53}
54
55impl InMemoryTransport {
56 #[must_use]
57 pub fn new() -> Self {
58 Self::default()
59 }
60}
61
62impl TransportLayer for InMemoryTransport {
63 fn send(
64 &self,
65 artifact: EncodedArtifact,
66 config: &AsupersyncConfig,
67 ) -> Result<TransferReport, AsupersyncError> {
68 validate_capability_gate(config, self.required_capabilities())?;
69
70 let mut guard = self.storage.lock().map_err(|_| {
71 AsupersyncError::Transport("in-memory transport lock poisoned".to_string())
72 })?;
73 let bytes_transferred = artifact.encoded_bytes.len();
74 let artifact_id = artifact.artifact_id.clone();
75 guard.insert(artifact_id.clone(), artifact);
76
77 Ok(TransferReport {
78 artifact_id,
79 bytes_transferred,
80 status: TransferStatus::Completed,
81 detail: "stored in in-memory transport".to_string(),
82 })
83 }
84
85 fn receive(
86 &self,
87 artifact_id: &str,
88 config: &AsupersyncConfig,
89 ) -> Result<EncodedArtifact, AsupersyncError> {
90 validate_capability_gate(config, self.required_capabilities())?;
91
92 let guard = self.storage.lock().map_err(|_| {
93 AsupersyncError::Transport("in-memory transport lock poisoned".to_string())
94 })?;
95 guard
96 .get(artifact_id)
97 .cloned()
98 .ok_or_else(|| AsupersyncError::ArtifactNotFound(artifact_id.to_string()))
99 }
100}