aura_composition/adapters/
transport.rs1use crate::adapters::collect_ops;
4use crate::adapters::utils::{deserialize_operation_params, serialize_operation_result};
5use crate::registry::{HandlerContext, HandlerError, RegistrableHandler};
6use async_trait::async_trait;
7use aura_core::effects::{NetworkCoreEffects, NetworkExtendedEffects};
8use aura_core::{EffectType, ExecutionMode};
9use cfg_if::cfg_if;
10use std::sync::Arc;
11
12cfg_if! {
13 if #[cfg(not(target_arch = "wasm32"))] {
14 use aura_effects::TcpTransportHandler as RealTransportHandler;
15 }
16}
17
18pub struct TransportHandlerAdapter {
20 core: Arc<dyn NetworkCoreEffects>,
21 extended: Option<Arc<dyn NetworkExtendedEffects>>,
22}
23
24impl TransportHandlerAdapter {
25 #[cfg(not(target_arch = "wasm32"))]
26 pub fn new(handler: RealTransportHandler) -> Self {
27 let handler = Arc::new(handler);
28 let core: Arc<dyn NetworkCoreEffects> = handler.clone();
29 let extended: Arc<dyn NetworkExtendedEffects> = handler;
30 Self {
31 core,
32 extended: Some(extended),
33 }
34 }
35
36 pub fn new_core(handler: Arc<dyn NetworkCoreEffects>) -> Self {
37 Self {
38 core: handler,
39 extended: None,
40 }
41 }
42
43 pub fn new_extended<T: NetworkExtendedEffects + 'static>(handler: T) -> Self {
44 let handler = Arc::new(handler);
45 let core: Arc<dyn NetworkCoreEffects> = handler.clone();
46 let extended: Arc<dyn NetworkExtendedEffects> = handler;
47 Self {
48 core,
49 extended: Some(extended),
50 }
51 }
52}
53
54#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
55#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
56impl RegistrableHandler for TransportHandlerAdapter {
57 async fn execute_operation_bytes(
58 &self,
59 effect_type: EffectType,
60 operation: &str,
61 parameters: &[u8],
62 _ctx: &HandlerContext,
63 ) -> Result<Vec<u8>, HandlerError> {
64 if effect_type != EffectType::Network {
65 return Err(HandlerError::UnsupportedEffect { effect_type });
66 }
67
68 match operation {
69 "send_to_peer" => {
70 let params: (uuid::Uuid, Vec<u8>) =
71 deserialize_operation_params(effect_type, operation, parameters)?;
72 self.core
73 .send_to_peer(params.0, params.1)
74 .await
75 .map_err(|e| HandlerError::ExecutionFailed {
76 source: Box::new(e),
77 })?;
78 Ok(Vec::new()) }
80 "broadcast" => {
81 let message: Vec<u8> =
82 deserialize_operation_params(effect_type, operation, parameters)?;
83 self.core
84 .broadcast(message)
85 .await
86 .map_err(|e| HandlerError::ExecutionFailed {
87 source: Box::new(e),
88 })?;
89 Ok(Vec::new()) }
91 "receive" => {
92 let received = NetworkCoreEffects::receive(&self.core).await.map_err(|e| {
93 HandlerError::ExecutionFailed {
94 source: Box::new(e),
95 }
96 })?;
97 serialize_operation_result(effect_type, operation, &received)
98 }
99 "receive_from" => {
100 let handler =
101 self.extended
102 .as_ref()
103 .ok_or_else(|| HandlerError::UnknownOperation {
104 effect_type,
105 operation: operation.to_string(),
106 })?;
107 let peer_id: uuid::Uuid =
108 deserialize_operation_params(effect_type, operation, parameters)?;
109 let received = handler.receive_from(peer_id).await.map_err(|e| {
110 HandlerError::ExecutionFailed {
111 source: Box::new(e),
112 }
113 })?;
114 serialize_operation_result(effect_type, operation, &received)
115 }
116 "connected_peers" => {
117 let handler =
118 self.extended
119 .as_ref()
120 .ok_or_else(|| HandlerError::UnknownOperation {
121 effect_type,
122 operation: operation.to_string(),
123 })?;
124 let peers = handler.connected_peers().await;
125 serialize_operation_result(effect_type, operation, &peers)
126 }
127 "is_peer_connected" => {
128 let handler =
129 self.extended
130 .as_ref()
131 .ok_or_else(|| HandlerError::UnknownOperation {
132 effect_type,
133 operation: operation.to_string(),
134 })?;
135 let peer_id: uuid::Uuid =
136 deserialize_operation_params(effect_type, operation, parameters)?;
137 let result = handler.is_peer_connected(peer_id).await;
138 serialize_operation_result(effect_type, operation, &result)
139 }
140 "subscribe_to_peer_events" => Err(HandlerError::ExecutionFailed {
141 source: "Peer event streams are not serializable in registry adapters".into(),
142 }),
143 "open" => {
144 let handler =
145 self.extended
146 .as_ref()
147 .ok_or_else(|| HandlerError::UnknownOperation {
148 effect_type,
149 operation: operation.to_string(),
150 })?;
151 let address: String =
152 deserialize_operation_params(effect_type, operation, parameters)?;
153 let connection_id =
154 handler
155 .open(&address)
156 .await
157 .map_err(|e| HandlerError::ExecutionFailed {
158 source: Box::new(e),
159 })?;
160 serialize_operation_result(effect_type, operation, &connection_id)
161 }
162 "send" => {
163 let handler =
164 self.extended
165 .as_ref()
166 .ok_or_else(|| HandlerError::UnknownOperation {
167 effect_type,
168 operation: operation.to_string(),
169 })?;
170 let (connection_id, data): (String, Vec<u8>) =
171 deserialize_operation_params(effect_type, operation, parameters)?;
172 handler.send(&connection_id, data).await.map_err(|e| {
173 HandlerError::ExecutionFailed {
174 source: Box::new(e),
175 }
176 })?;
177 Ok(Vec::new())
178 }
179 "close" => {
180 let handler =
181 self.extended
182 .as_ref()
183 .ok_or_else(|| HandlerError::UnknownOperation {
184 effect_type,
185 operation: operation.to_string(),
186 })?;
187 let connection_id: String =
188 deserialize_operation_params(effect_type, operation, parameters)?;
189 handler
190 .close(&connection_id)
191 .await
192 .map_err(|e| HandlerError::ExecutionFailed {
193 source: Box::new(e),
194 })?;
195 Ok(Vec::new())
196 }
197 _ => Err(HandlerError::UnknownOperation {
198 effect_type,
199 operation: operation.to_string(),
200 }),
201 }
202 }
203
204 fn supported_operations(&self, effect_type: EffectType) -> Vec<String> {
205 collect_ops(effect_type, self.extended.is_some())
206 }
207
208 fn supports_effect(&self, effect_type: EffectType) -> bool {
209 effect_type == EffectType::Network
210 }
211
212 fn execution_mode(&self) -> ExecutionMode {
213 ExecutionMode::Production
214 }
215}