Skip to main content

aura_composition/adapters/
transport.rs

1//! Transport handler adapter
2
3use crate::adapters::collect_ops;
4use crate::adapters::utils::{deserialize_operation_params, serialize_operation_result};
5use crate::registry::{HandlerContext, HandlerError, RegistrableHandler};
6use async_trait::async_trait;
7use aura_core::effects::{NetworkCoreEffects, NetworkExtendedEffects};
8use aura_core::{EffectType, ExecutionMode};
9use cfg_if::cfg_if;
10use std::sync::Arc;
11
12cfg_if! {
13    if #[cfg(not(target_arch = "wasm32"))] {
14        use aura_effects::TcpTransportHandler as RealTransportHandler;
15    }
16}
17
18/// Adapter for TcpTransportHandler (NetworkEffects implementation)
19pub struct TransportHandlerAdapter {
20    core: Arc<dyn NetworkCoreEffects>,
21    extended: Option<Arc<dyn NetworkExtendedEffects>>,
22}
23
24impl TransportHandlerAdapter {
25    #[cfg(not(target_arch = "wasm32"))]
26    pub fn new(handler: RealTransportHandler) -> Self {
27        let handler = Arc::new(handler);
28        let core: Arc<dyn NetworkCoreEffects> = handler.clone();
29        let extended: Arc<dyn NetworkExtendedEffects> = handler;
30        Self {
31            core,
32            extended: Some(extended),
33        }
34    }
35
36    pub fn new_core(handler: Arc<dyn NetworkCoreEffects>) -> Self {
37        Self {
38            core: handler,
39            extended: None,
40        }
41    }
42
43    pub fn new_extended<T: NetworkExtendedEffects + 'static>(handler: T) -> Self {
44        let handler = Arc::new(handler);
45        let core: Arc<dyn NetworkCoreEffects> = handler.clone();
46        let extended: Arc<dyn NetworkExtendedEffects> = handler;
47        Self {
48            core,
49            extended: Some(extended),
50        }
51    }
52}
53
54#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
55#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
56impl RegistrableHandler for TransportHandlerAdapter {
57    async fn execute_operation_bytes(
58        &self,
59        effect_type: EffectType,
60        operation: &str,
61        parameters: &[u8],
62        _ctx: &HandlerContext,
63    ) -> Result<Vec<u8>, HandlerError> {
64        if effect_type != EffectType::Network {
65            return Err(HandlerError::UnsupportedEffect { effect_type });
66        }
67
68        match operation {
69            "send_to_peer" => {
70                let params: (uuid::Uuid, Vec<u8>) =
71                    deserialize_operation_params(effect_type, operation, parameters)?;
72                self.core
73                    .send_to_peer(params.0, params.1)
74                    .await
75                    .map_err(|e| HandlerError::ExecutionFailed {
76                        source: Box::new(e),
77                    })?;
78                Ok(Vec::new()) // send returns void
79            }
80            "broadcast" => {
81                let message: Vec<u8> =
82                    deserialize_operation_params(effect_type, operation, parameters)?;
83                self.core
84                    .broadcast(message)
85                    .await
86                    .map_err(|e| HandlerError::ExecutionFailed {
87                        source: Box::new(e),
88                    })?;
89                Ok(Vec::new()) // broadcast returns void
90            }
91            "receive" => {
92                let received = NetworkCoreEffects::receive(&self.core).await.map_err(|e| {
93                    HandlerError::ExecutionFailed {
94                        source: Box::new(e),
95                    }
96                })?;
97                serialize_operation_result(effect_type, operation, &received)
98            }
99            "receive_from" => {
100                let handler =
101                    self.extended
102                        .as_ref()
103                        .ok_or_else(|| HandlerError::UnknownOperation {
104                            effect_type,
105                            operation: operation.to_string(),
106                        })?;
107                let peer_id: uuid::Uuid =
108                    deserialize_operation_params(effect_type, operation, parameters)?;
109                let received = handler.receive_from(peer_id).await.map_err(|e| {
110                    HandlerError::ExecutionFailed {
111                        source: Box::new(e),
112                    }
113                })?;
114                serialize_operation_result(effect_type, operation, &received)
115            }
116            "connected_peers" => {
117                let handler =
118                    self.extended
119                        .as_ref()
120                        .ok_or_else(|| HandlerError::UnknownOperation {
121                            effect_type,
122                            operation: operation.to_string(),
123                        })?;
124                let peers = handler.connected_peers().await;
125                serialize_operation_result(effect_type, operation, &peers)
126            }
127            "is_peer_connected" => {
128                let handler =
129                    self.extended
130                        .as_ref()
131                        .ok_or_else(|| HandlerError::UnknownOperation {
132                            effect_type,
133                            operation: operation.to_string(),
134                        })?;
135                let peer_id: uuid::Uuid =
136                    deserialize_operation_params(effect_type, operation, parameters)?;
137                let result = handler.is_peer_connected(peer_id).await;
138                serialize_operation_result(effect_type, operation, &result)
139            }
140            "subscribe_to_peer_events" => Err(HandlerError::ExecutionFailed {
141                source: "Peer event streams are not serializable in registry adapters".into(),
142            }),
143            "open" => {
144                let handler =
145                    self.extended
146                        .as_ref()
147                        .ok_or_else(|| HandlerError::UnknownOperation {
148                            effect_type,
149                            operation: operation.to_string(),
150                        })?;
151                let address: String =
152                    deserialize_operation_params(effect_type, operation, parameters)?;
153                let connection_id =
154                    handler
155                        .open(&address)
156                        .await
157                        .map_err(|e| HandlerError::ExecutionFailed {
158                            source: Box::new(e),
159                        })?;
160                serialize_operation_result(effect_type, operation, &connection_id)
161            }
162            "send" => {
163                let handler =
164                    self.extended
165                        .as_ref()
166                        .ok_or_else(|| HandlerError::UnknownOperation {
167                            effect_type,
168                            operation: operation.to_string(),
169                        })?;
170                let (connection_id, data): (String, Vec<u8>) =
171                    deserialize_operation_params(effect_type, operation, parameters)?;
172                handler.send(&connection_id, data).await.map_err(|e| {
173                    HandlerError::ExecutionFailed {
174                        source: Box::new(e),
175                    }
176                })?;
177                Ok(Vec::new())
178            }
179            "close" => {
180                let handler =
181                    self.extended
182                        .as_ref()
183                        .ok_or_else(|| HandlerError::UnknownOperation {
184                            effect_type,
185                            operation: operation.to_string(),
186                        })?;
187                let connection_id: String =
188                    deserialize_operation_params(effect_type, operation, parameters)?;
189                handler
190                    .close(&connection_id)
191                    .await
192                    .map_err(|e| HandlerError::ExecutionFailed {
193                        source: Box::new(e),
194                    })?;
195                Ok(Vec::new())
196            }
197            _ => Err(HandlerError::UnknownOperation {
198                effect_type,
199                operation: operation.to_string(),
200            }),
201        }
202    }
203
204    fn supported_operations(&self, effect_type: EffectType) -> Vec<String> {
205        collect_ops(effect_type, self.extended.is_some())
206    }
207
208    fn supports_effect(&self, effect_type: EffectType) -> bool {
209        effect_type == EffectType::Network
210    }
211
212    fn execution_mode(&self) -> ExecutionMode {
213        ExecutionMode::Production
214    }
215}