netbeam/sync/
mod.rs

1/*!
2 * # Netbeam Synchronization Module
3 *
4 * Core synchronization primitives and networking components for the Netbeam framework.
5 * This module provides the foundation for building reliable, bidirectional network
6 * communication channels with advanced synchronization capabilities.
7 *
8 * ## Features
9 * - Network endpoint management and addressing
10 * - Bidirectional channels with callback support
11 * - Request-response tracking and correlation
12 * - Network application synchronization
13 * - Symmetric conversation tracking
14 * - Reliable ordered streaming
15 *
16 * ## Module Structure
17 * - `operations/`: Network operation implementations (select, join)
18 * - `primitives/`: Core synchronization primitives (mutex, rwlock)
19 * - `channel/`: Channel implementations for network communication
20 * - `subscription/`: Subscription and event handling
21 *
22 * ## Important Notes
23 * - All network operations are async/await compatible
24 * - Implements reliable ordered streaming by default
25 * - Thread-safe with atomic operations
26 * - Supports symmetric conversations across nodes
27 *
28 * ## Related Components
29 * - `reliable_conn/`: Reliable connection handling
30 * - `proto/`: Protocol implementation details
31 */
32
33use serde::{Deserialize, Serialize};
34
35pub mod operations;
36pub mod primitives;
37
38pub mod subscription;
39
40pub mod network_application;
41pub mod network_endpoint;
42pub mod sync_start;
43
44pub mod channel;
45
46pub mod callback_channel;
47pub mod tracked_callback_channel;
48
49#[derive(Serialize, Deserialize, Eq, PartialEq, Hash, Debug, Copy, Clone)]
50/// Used to keep track between two symmetric actions across two nodes
51pub struct SymmetricConvID(u64);
52
53impl From<u64> for SymmetricConvID {
54    fn from(item: u64) -> Self {
55        Self(item)
56    }
57}
58
59pub mod test_utils {
60    use async_trait::async_trait;
61    use bytes::Bytes;
62    use citadel_io::tokio::net::{TcpListener, TcpStream};
63    use citadel_io::tokio::sync::Mutex;
64    use citadel_io::tokio_util::codec::{Framed, LengthDelimitedCodec};
65    use futures::stream::{SplitSink, SplitStream};
66    use futures::{SinkExt, StreamExt};
67
68    use crate::reliable_conn::simulator::NetworkConnSimulator;
69    use crate::reliable_conn::{ConnAddr, ReliableOrderedStreamToTarget};
70    use crate::sync::network_application::NetworkApplication;
71    use crate::sync::network_endpoint::NetworkEndpoint;
72    use crate::sync::RelativeNodeType;
73    use std::net::SocketAddr;
74
75    pub struct TcpCodecFramed {
76        sink: Mutex<SplitSink<Framed<TcpStream, LengthDelimitedCodec>, Bytes>>,
77        stream: Mutex<SplitStream<Framed<TcpStream, LengthDelimitedCodec>>>,
78        local_addr: SocketAddr,
79        peer_addr: SocketAddr,
80    }
81
82    #[async_trait]
83    impl ReliableOrderedStreamToTarget for TcpCodecFramed {
84        async fn send_to_peer(&self, input: &[u8]) -> std::io::Result<()> {
85            self.sink
86                .lock()
87                .await
88                .send(Bytes::copy_from_slice(input))
89                .await
90        }
91
92        async fn recv(&self) -> std::io::Result<Bytes> {
93            Ok(self
94                .stream
95                .lock()
96                .await
97                .next()
98                .await
99                .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::BrokenPipe, "Stream died"))??
100                .freeze())
101        }
102    }
103
104    impl ConnAddr for TcpCodecFramed {
105        fn local_addr(&self) -> std::io::Result<SocketAddr> {
106            Ok(self.local_addr)
107        }
108
109        fn peer_addr(&self) -> std::io::Result<SocketAddr> {
110            Ok(self.peer_addr)
111        }
112    }
113
114    fn codec(stream: TcpStream) -> TcpCodecFramed {
115        let local_addr = stream.local_addr().unwrap();
116        let peer_addr = stream.peer_addr().unwrap();
117        let (sink, stream) = LengthDelimitedCodec::builder().new_framed(stream).split();
118        TcpCodecFramed {
119            sink: Mutex::new(sink),
120            stream: Mutex::new(stream),
121            peer_addr,
122            local_addr,
123        }
124    }
125
126    fn create_listener<A: std::net::ToSocketAddrs>(addr: A) -> TcpListener {
127        let std_listener = std::net::TcpListener::bind(addr).unwrap();
128        std_listener.set_nonblocking(true).unwrap();
129        TcpListener::from_std(std_listener).unwrap()
130    }
131
132    fn create_connect<A: std::net::ToSocketAddrs>(addr: A) -> TcpStream {
133        let std_stream = std::net::TcpStream::connect(addr).unwrap();
134        std_stream.set_nonblocking(true).unwrap();
135        TcpStream::from_std(std_stream).unwrap()
136    }
137
138    pub async fn create_streams() -> (NetworkApplication, NetworkApplication) {
139        let (tx, rx) = citadel_io::tokio::sync::oneshot::channel();
140        let server = async move {
141            let listener = create_listener("127.0.0.1:0");
142            tx.send(listener.local_addr().unwrap()).unwrap();
143            NetworkApplication::register(
144                RelativeNodeType::Receiver,
145                NetworkConnSimulator::new(0, codec(listener.accept().await.unwrap().0)),
146            )
147            .await
148            .unwrap()
149        };
150
151        let client = async move {
152            let addr = rx.await.unwrap();
153            NetworkApplication::register(
154                RelativeNodeType::Initiator,
155                NetworkConnSimulator::new(0, codec(create_connect(addr))),
156            )
157            .await
158            .unwrap()
159        };
160
161        citadel_io::tokio::join!(server, client)
162    }
163
164    pub async fn create_streams_with_addrs_and_lag(
165        min: usize,
166    ) -> (NetworkEndpoint, NetworkEndpoint) {
167        let (tx, rx) = citadel_io::tokio::sync::oneshot::channel();
168        let server = async move {
169            let listener = create_listener("127.0.0.1:0");
170            tx.send(listener.local_addr().unwrap()).unwrap();
171            NetworkEndpoint::register(
172                RelativeNodeType::Receiver,
173                NetworkConnSimulator::new(min, codec(listener.accept().await.unwrap().0)),
174            )
175            .await
176            .unwrap()
177        };
178
179        let client = async move {
180            let addr = rx.await.unwrap();
181            NetworkEndpoint::register(
182                RelativeNodeType::Initiator,
183                NetworkConnSimulator::new(min, codec(create_connect(addr))),
184            )
185            .await
186            .unwrap()
187        };
188
189        citadel_io::tokio::join!(server, client)
190    }
191
192    pub async fn create_streams_with_addrs() -> (NetworkEndpoint, NetworkEndpoint) {
193        create_streams_with_addrs_and_lag(0).await
194    }
195}
196
197#[derive(Copy, Clone, Debug, PartialEq, Serialize, Deserialize)]
198pub enum RelativeNodeType {
199    Initiator,
200    Receiver,
201}
202
203impl RelativeNodeType {
204    pub fn into_byte(self) -> u8 {
205        match self {
206            RelativeNodeType::Initiator => 10,
207            RelativeNodeType::Receiver => 20,
208        }
209    }
210
211    pub fn from_byte(byte: u8) -> Option<Self> {
212        match byte {
213            10 => Some(RelativeNodeType::Initiator),
214            20 => Some(RelativeNodeType::Receiver),
215            _ => None,
216        }
217    }
218}