1use serde::{Deserialize, Serialize};
34
35pub mod operations;
36pub mod primitives;
37
38pub mod subscription;
39
40pub mod network_application;
41pub mod network_endpoint;
42pub mod sync_start;
43
44pub mod channel;
45
46pub mod callback_channel;
47pub mod tracked_callback_channel;
48
49#[derive(Serialize, Deserialize, Eq, PartialEq, Hash, Debug, Copy, Clone)]
50pub struct SymmetricConvID(u64);
52
53impl From<u64> for SymmetricConvID {
54 fn from(item: u64) -> Self {
55 Self(item)
56 }
57}
58
59pub mod test_utils {
60 use async_trait::async_trait;
61 use bytes::Bytes;
62 use citadel_io::tokio::net::{TcpListener, TcpStream};
63 use citadel_io::tokio::sync::Mutex;
64 use citadel_io::tokio_util::codec::{Framed, LengthDelimitedCodec};
65 use futures::stream::{SplitSink, SplitStream};
66 use futures::{SinkExt, StreamExt};
67
68 use crate::reliable_conn::simulator::NetworkConnSimulator;
69 use crate::reliable_conn::{ConnAddr, ReliableOrderedStreamToTarget};
70 use crate::sync::network_application::NetworkApplication;
71 use crate::sync::network_endpoint::NetworkEndpoint;
72 use crate::sync::RelativeNodeType;
73 use std::net::SocketAddr;
74
75 pub struct TcpCodecFramed {
76 sink: Mutex<SplitSink<Framed<TcpStream, LengthDelimitedCodec>, Bytes>>,
77 stream: Mutex<SplitStream<Framed<TcpStream, LengthDelimitedCodec>>>,
78 local_addr: SocketAddr,
79 peer_addr: SocketAddr,
80 }
81
82 #[async_trait]
83 impl ReliableOrderedStreamToTarget for TcpCodecFramed {
84 async fn send_to_peer(&self, input: &[u8]) -> std::io::Result<()> {
85 self.sink
86 .lock()
87 .await
88 .send(Bytes::copy_from_slice(input))
89 .await
90 }
91
92 async fn recv(&self) -> std::io::Result<Bytes> {
93 Ok(self
94 .stream
95 .lock()
96 .await
97 .next()
98 .await
99 .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::BrokenPipe, "Stream died"))??
100 .freeze())
101 }
102 }
103
104 impl ConnAddr for TcpCodecFramed {
105 fn local_addr(&self) -> std::io::Result<SocketAddr> {
106 Ok(self.local_addr)
107 }
108
109 fn peer_addr(&self) -> std::io::Result<SocketAddr> {
110 Ok(self.peer_addr)
111 }
112 }
113
114 fn codec(stream: TcpStream) -> TcpCodecFramed {
115 let local_addr = stream.local_addr().unwrap();
116 let peer_addr = stream.peer_addr().unwrap();
117 let (sink, stream) = LengthDelimitedCodec::builder().new_framed(stream).split();
118 TcpCodecFramed {
119 sink: Mutex::new(sink),
120 stream: Mutex::new(stream),
121 peer_addr,
122 local_addr,
123 }
124 }
125
126 fn create_listener<A: std::net::ToSocketAddrs>(addr: A) -> TcpListener {
127 let std_listener = std::net::TcpListener::bind(addr).unwrap();
128 std_listener.set_nonblocking(true).unwrap();
129 TcpListener::from_std(std_listener).unwrap()
130 }
131
132 fn create_connect<A: std::net::ToSocketAddrs>(addr: A) -> TcpStream {
133 let std_stream = std::net::TcpStream::connect(addr).unwrap();
134 std_stream.set_nonblocking(true).unwrap();
135 TcpStream::from_std(std_stream).unwrap()
136 }
137
138 pub async fn create_streams() -> (NetworkApplication, NetworkApplication) {
139 let (tx, rx) = citadel_io::tokio::sync::oneshot::channel();
140 let server = async move {
141 let listener = create_listener("127.0.0.1:0");
142 tx.send(listener.local_addr().unwrap()).unwrap();
143 NetworkApplication::register(
144 RelativeNodeType::Receiver,
145 NetworkConnSimulator::new(0, codec(listener.accept().await.unwrap().0)),
146 )
147 .await
148 .unwrap()
149 };
150
151 let client = async move {
152 let addr = rx.await.unwrap();
153 NetworkApplication::register(
154 RelativeNodeType::Initiator,
155 NetworkConnSimulator::new(0, codec(create_connect(addr))),
156 )
157 .await
158 .unwrap()
159 };
160
161 citadel_io::tokio::join!(server, client)
162 }
163
164 pub async fn create_streams_with_addrs_and_lag(
165 min: usize,
166 ) -> (NetworkEndpoint, NetworkEndpoint) {
167 let (tx, rx) = citadel_io::tokio::sync::oneshot::channel();
168 let server = async move {
169 let listener = create_listener("127.0.0.1:0");
170 tx.send(listener.local_addr().unwrap()).unwrap();
171 NetworkEndpoint::register(
172 RelativeNodeType::Receiver,
173 NetworkConnSimulator::new(min, codec(listener.accept().await.unwrap().0)),
174 )
175 .await
176 .unwrap()
177 };
178
179 let client = async move {
180 let addr = rx.await.unwrap();
181 NetworkEndpoint::register(
182 RelativeNodeType::Initiator,
183 NetworkConnSimulator::new(min, codec(create_connect(addr))),
184 )
185 .await
186 .unwrap()
187 };
188
189 citadel_io::tokio::join!(server, client)
190 }
191
192 pub async fn create_streams_with_addrs() -> (NetworkEndpoint, NetworkEndpoint) {
193 create_streams_with_addrs_and_lag(0).await
194 }
195}
196
197#[derive(Copy, Clone, Debug, PartialEq, Serialize, Deserialize)]
198pub enum RelativeNodeType {
199 Initiator,
200 Receiver,
201}
202
203impl RelativeNodeType {
204 pub fn into_byte(self) -> u8 {
205 match self {
206 RelativeNodeType::Initiator => 10,
207 RelativeNodeType::Receiver => 20,
208 }
209 }
210
211 pub fn from_byte(byte: u8) -> Option<Self> {
212 match byte {
213 10 => Some(RelativeNodeType::Initiator),
214 20 => Some(RelativeNodeType::Receiver),
215 _ => None,
216 }
217 }
218}