Skip to main content

aura_core/effects/
network.rs

1//! Network effects trait definitions
2//!
3//! This module defines the trait interfaces for network communication operations.
4//! Implementations are provided by aura-agent handlers using aura-transport.
5//!
6//! # Effect Classification
7//!
8//! - **Category**: Infrastructure Effect
9//! - **Implementation**: `aura-effects` (Layer 3)
10//! - **Usage**: All crates needing network communication (TCP, message sending/receiving)
11//!
12//! This is an infrastructure effect that must be implemented in `aura-effects`
13//! with stateless handlers. Domain crates should not implement this trait directly.
14
15use async_trait::async_trait;
16use std::sync::Arc;
17use uuid::Uuid;
18
19/// Network address for peer communication
20#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
21pub struct NetworkAddress {
22    address: String,
23}
24
25impl NetworkAddress {
26    /// Create a new network address
27    pub fn new(address: String) -> Self {
28        Self { address }
29    }
30
31    /// Get the address string
32    pub fn as_str(&self) -> &str {
33        &self.address
34    }
35}
36
37impl From<&str> for NetworkAddress {
38    fn from(address: &str) -> Self {
39        Self::new(address.to_string())
40    }
41}
42
43impl From<String> for NetworkAddress {
44    fn from(address: String) -> Self {
45        Self::new(address)
46    }
47}
48
49/// Network operation errors
50#[derive(Debug, thiserror::Error, serde::Serialize, serde::Deserialize)]
51pub enum NetworkError {
52    /// Failed to send a message to the destination
53    #[error("Failed to send message to {peer_id:?}: {reason}")]
54    SendFailed {
55        /// Optional peer the send targeted
56        peer_id: Option<Uuid>,
57        /// Reason for the failure
58        reason: String,
59    },
60    /// Failed to receive a message from the source
61    #[error("Failed to receive message: {reason}")]
62    ReceiveFailed {
63        /// Reason for the failure
64        reason: String,
65    },
66    /// No message is available to receive
67    #[error("No message available")]
68    NoMessage,
69    /// Broadcast operation failed
70    #[error("Broadcast failed: {reason}")]
71    BroadcastFailed {
72        /// Reason for broadcast failure
73        reason: String,
74    },
75    /// Failed to establish a connection
76    #[error("Connection failed: {0}")]
77    ConnectionFailed(String),
78    /// Operation is unsupported by the current handler
79    #[error("Unsupported operation")]
80    NotImplemented,
81    /// Serialization failed while preparing a network payload
82    #[error("Serialization failed: {error}")]
83    SerializationFailed {
84        /// Serialization error message
85        error: String,
86    },
87    /// Deserialization failed while decoding a payload
88    #[error("Deserialization failed: {error}")]
89    DeserializationFailed {
90        /// Deserialization error message
91        error: String,
92    },
93    /// Operation timed out
94    #[error("Operation '{operation}' timed out after {timeout_ms}ms")]
95    OperationTimeout {
96        /// The operation that timed out
97        operation: String,
98        /// Timeout duration in milliseconds
99        timeout_ms: u64,
100    },
101    /// Request retry limit exceeded
102    #[error("Retry limit exceeded after {attempts} attempts. Last error: {last_error}")]
103    RetryLimitExceeded {
104        /// Number of retry attempts made
105        attempts: u32,
106        /// Error message from the last attempt
107        last_error: String,
108    },
109    /// Circuit breaker is open
110    #[error("Circuit breaker is open: {reason}")]
111    CircuitBreakerOpen {
112        /// Reason the circuit breaker was opened
113        reason: String,
114    },
115    /// Peer unreachable
116    #[error("Peer unreachable: {peer_id}")]
117    PeerUnreachable {
118        /// Identifier of the unreachable peer
119        peer_id: String,
120    },
121    /// Network partition detected
122    #[error("Network partition detected: {details}")]
123    NetworkPartition {
124        /// Details about the detected partition
125        details: String,
126    },
127    /// Message validation failed
128    #[error("Message validation failed: {reason}")]
129    ValidationFailed {
130        /// Reason validation failed
131        reason: String,
132    },
133    /// Rate limit exceeded
134    #[error("Rate limit exceeded: {limit} requests per {window_ms}ms window")]
135    RateLimitExceeded {
136        /// Request limit
137        limit: u32,
138        /// Time window in milliseconds
139        window_ms: u64,
140    },
141    /// Subscription to peer events failed
142    #[error("Subscription failed: {reason}")]
143    SubscriptionFailed {
144        /// Reason for failure
145        reason: String,
146    },
147}
148
149/// Opaque UDP endpoint wrapper to keep core effects runtime-agnostic.
150#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
151pub struct UdpEndpoint {
152    address: String,
153}
154
155impl UdpEndpoint {
156    /// Create a new UDP endpoint wrapper.
157    pub fn new(address: impl Into<String>) -> Self {
158        Self {
159            address: address.into(),
160        }
161    }
162
163    /// Get the address string.
164    pub fn as_str(&self) -> &str {
165        &self.address
166    }
167}
168
169impl std::fmt::Display for UdpEndpoint {
170    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171        write!(f, "{}", self.address)
172    }
173}
174
175impl From<&str> for UdpEndpoint {
176    fn from(address: &str) -> Self {
177        Self::new(address)
178    }
179}
180
181impl From<String> for UdpEndpoint {
182    fn from(address: String) -> Self {
183        Self::new(address)
184    }
185}
186
187/// UDP endpoint operations for Aura effects.
188#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
189#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
190pub trait UdpEndpointEffects: Send + Sync {
191    /// Enable or disable UDP broadcast on the socket.
192    async fn set_broadcast(&self, enabled: bool) -> Result<(), NetworkError>;
193
194    /// Send a datagram to the destination address.
195    async fn send_to(&self, payload: &[u8], addr: &UdpEndpoint) -> Result<usize, NetworkError>;
196
197    /// Receive a datagram into the provided buffer.
198    async fn recv_from(&self, buffer: &mut [u8]) -> Result<(usize, UdpEndpoint), NetworkError>;
199}
200
201/// UDP effect surface for binding sockets.
202#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
203#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
204pub trait UdpEffects: Send + Sync {
205    /// Bind a UDP socket to the given address.
206    async fn udp_bind(
207        &self,
208        addr: UdpEndpoint,
209    ) -> Result<Arc<dyn UdpEndpointEffects>, NetworkError>;
210}
211
212#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
213#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
214impl<T: UdpEffects + ?Sized> UdpEffects for Arc<T> {
215    async fn udp_bind(
216        &self,
217        addr: UdpEndpoint,
218    ) -> Result<Arc<dyn UdpEndpointEffects>, NetworkError> {
219        (**self).udp_bind(addr).await
220    }
221}
222
223#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
224#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
225impl<T: UdpEndpointEffects + ?Sized> UdpEndpointEffects for Arc<T> {
226    async fn set_broadcast(&self, enabled: bool) -> Result<(), NetworkError> {
227        (**self).set_broadcast(enabled).await
228    }
229
230    async fn send_to(&self, payload: &[u8], addr: &UdpEndpoint) -> Result<usize, NetworkError> {
231        (**self).send_to(payload, addr).await
232    }
233
234    async fn recv_from(&self, buffer: &mut [u8]) -> Result<(usize, UdpEndpoint), NetworkError> {
235        (**self).recv_from(buffer).await
236    }
237}
238
239/// Stream type for peer connection events
240pub type PeerEventStream = std::pin::Pin<Box<dyn futures::Stream<Item = PeerEvent> + Send>>;
241
242/// Peer connection events
243#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
244pub enum PeerEvent {
245    /// Peer connected
246    Connected(Uuid),
247    /// Peer disconnected
248    Disconnected(Uuid),
249    /// Connection failed
250    ConnectionFailed(Uuid, String),
251}
252
253/// Network usability signal emitted by platform monitors.
254#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
255pub enum NetworkUsability {
256    /// Network is usable for peer connectivity.
257    Usable,
258    /// Network is currently unusable (captured with a reason string).
259    Unusable { reason: String },
260}
261
262/// Network change signal with monotonic generation.
263#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
264pub struct NetworkChange {
265    /// Monotonic generation identifier for network state transitions.
266    pub generation: u64,
267    /// Current usability state.
268    pub usability: NetworkUsability,
269    /// Snapshot of local interface addresses associated with this generation.
270    pub interfaces: Vec<NetworkAddress>,
271}
272
273/// Runtime-agnostic stream of network change notifications.
274#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
275#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
276pub trait NetworkChangeStream: Send {
277    /// Return the next network change, or `None` if the stream has closed.
278    async fn next_change(&mut self) -> Result<Option<NetworkChange>, NetworkError>;
279}
280
281#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
282#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
283impl<T: NetworkChangeStream + ?Sized> NetworkChangeStream for Box<T> {
284    async fn next_change(&mut self) -> Result<Option<NetworkChange>, NetworkError> {
285        (**self).next_change().await
286    }
287}
288
289/// Optional network change subscription surface.
290///
291/// Handlers that do not expose platform network-change notifications may keep
292/// the default `NotImplemented` behavior.
293#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
294#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
295pub trait NetworkChangeEffects: Send + Sync {
296    /// Subscribe to network change notifications.
297    async fn subscribe_network_changes(
298        &self,
299    ) -> Result<Box<dyn NetworkChangeStream>, NetworkError> {
300        Err(NetworkError::NotImplemented)
301    }
302}
303
304#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
305#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
306impl<T: NetworkChangeEffects + ?Sized> NetworkChangeEffects for Arc<T> {
307    async fn subscribe_network_changes(
308        &self,
309    ) -> Result<Box<dyn NetworkChangeStream>, NetworkError> {
310        (**self).subscribe_network_changes().await
311    }
312}
313
314/// Core network effects interface for communication operations.
315///
316/// This trait defines network operations for the Aura effects system.
317/// Implementations are provided in aura-agent handlers using aura-transport.
318/// Different implementations exist for:
319/// - Production: Real network communication
320/// - Testing: Mock network with controllable message delivery
321/// - Simulation: Network scenarios with partitions and faults
322#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
323#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
324pub trait NetworkCoreEffects: Send + Sync {
325    /// Send a message to a specific peer.
326    ///
327    /// `peer_id` is the wire UUID form of an `AuthorityId`, not a `DeviceId`.
328    async fn send_to_peer(&self, peer_id: Uuid, message: Vec<u8>) -> Result<(), NetworkError>;
329
330    /// Broadcast a message to all connected peers
331    async fn broadcast(&self, message: Vec<u8>) -> Result<(), NetworkError>;
332
333    /// Receive the next available message
334    async fn receive(&self) -> Result<(Uuid, Vec<u8>), NetworkError>;
335}
336
337/// Optional network effects that build on the core interface.
338#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
339#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
340pub trait NetworkExtendedEffects: NetworkCoreEffects + Send + Sync {
341    /// Receive message from a specific peer authority UUID on the wire.
342    async fn receive_from(&self, _peer_id: Uuid) -> Result<Vec<u8>, NetworkError> {
343        Err(NetworkError::NotImplemented)
344    }
345
346    /// Get currently connected peer authority UUIDs on the wire.
347    async fn connected_peers(&self) -> Vec<Uuid> {
348        Vec::new()
349    }
350
351    /// Check if a peer authority UUID is connected.
352    async fn is_peer_connected(&self, _peer_id: Uuid) -> bool {
353        false
354    }
355
356    /// Subscribe to peer connection events
357    async fn subscribe_to_peer_events(&self) -> Result<PeerEventStream, NetworkError> {
358        Err(NetworkError::NotImplemented)
359    }
360
361    // === Connection-oriented methods (for transport coordination) ===
362
363    /// Open a connection to the specified address
364    ///
365    /// Returns a connection identifier that can be used with `send` and `close`.
366    /// This provides lower-level connection management for transport coordinators.
367    async fn open(&self, _address: &str) -> Result<String, NetworkError> {
368        Err(NetworkError::NotImplemented)
369    }
370
371    /// Send data over an established connection
372    ///
373    /// The connection_id must be from a previous successful `open` call.
374    async fn send(&self, _connection_id: &str, _data: Vec<u8>) -> Result<(), NetworkError> {
375        Err(NetworkError::NotImplemented)
376    }
377
378    /// Close an established connection
379    ///
380    /// After closing, the connection_id is no longer valid.
381    async fn close(&self, _connection_id: &str) -> Result<(), NetworkError> {
382        Err(NetworkError::NotImplemented)
383    }
384}
385
386/// Combined network effects surface (core + extended).
387pub trait NetworkEffects: NetworkCoreEffects + NetworkExtendedEffects {}
388
389impl<T: NetworkCoreEffects + NetworkExtendedEffects + ?Sized> NetworkEffects for T {}
390/// Blanket implementation for Arc<T> where T: NetworkCoreEffects
391#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
392#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
393impl<T: NetworkCoreEffects + ?Sized> NetworkCoreEffects for std::sync::Arc<T> {
394    async fn send_to_peer(&self, peer_id: Uuid, message: Vec<u8>) -> Result<(), NetworkError> {
395        (**self).send_to_peer(peer_id, message).await
396    }
397
398    async fn broadcast(&self, message: Vec<u8>) -> Result<(), NetworkError> {
399        (**self).broadcast(message).await
400    }
401
402    async fn receive(&self) -> Result<(Uuid, Vec<u8>), NetworkError> {
403        (**self).receive().await
404    }
405}
406
407/// Blanket implementation for Arc<T> where T: NetworkExtendedEffects
408#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
409#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
410impl<T: NetworkExtendedEffects + ?Sized> NetworkExtendedEffects for std::sync::Arc<T> {
411    async fn receive_from(&self, peer_id: Uuid) -> Result<Vec<u8>, NetworkError> {
412        (**self).receive_from(peer_id).await
413    }
414
415    async fn connected_peers(&self) -> Vec<Uuid> {
416        (**self).connected_peers().await
417    }
418
419    async fn is_peer_connected(&self, peer_id: Uuid) -> bool {
420        (**self).is_peer_connected(peer_id).await
421    }
422
423    async fn subscribe_to_peer_events(&self) -> Result<PeerEventStream, NetworkError> {
424        (**self).subscribe_to_peer_events().await
425    }
426
427    async fn open(&self, address: &str) -> Result<String, NetworkError> {
428        (**self).open(address).await
429    }
430
431    async fn send(&self, connection_id: &str, data: Vec<u8>) -> Result<(), NetworkError> {
432        (**self).send(connection_id, data).await
433    }
434
435    async fn close(&self, connection_id: &str) -> Result<(), NetworkError> {
436        (**self).close(connection_id).await
437    }
438}