1use std::sync::Arc;
7
8use async_trait::async_trait;
9use futures_lite::{AsyncRead, AsyncWrite, StreamExt};
10use futures_util::{Sink, SinkExt};
11use serde::{Deserialize, Serialize};
12use tracing::debug;
13
14use crate::cbor::{into_cbor_sink, into_cbor_stream};
15use crate::{FromSync, SyncError, SyncProtocol, TopicQuery};
16
17#[derive(Clone, Debug, PartialEq, Eq, Hash, Deserialize, Serialize)]
18pub struct SyncTestTopic(String, pub [u8; 32]);
19
20impl SyncTestTopic {
21 pub fn new(name: &str) -> Self {
22 Self(name.to_owned(), [0; 32])
23 }
24}
25
26impl TopicQuery for SyncTestTopic {}
27
28#[derive(Debug, Serialize, Deserialize)]
29enum DummyProtocolMessage {
30 TopicQuery(SyncTestTopic),
31 Done,
32}
33
34#[derive(Debug)]
36pub struct DummyProtocol {}
37
38#[async_trait]
39impl<'a> SyncProtocol<'a, SyncTestTopic> for DummyProtocol {
40 fn name(&self) -> &'static str {
41 static DUMMY_PROTOCOL_NAME: &str = "dummy_protocol";
42 DUMMY_PROTOCOL_NAME
43 }
44
45 async fn initiate(
46 self: Arc<Self>,
47 topic_query: SyncTestTopic,
48 tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
49 rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
50 mut app_tx: Box<
51 &'a mut (dyn Sink<FromSync<SyncTestTopic>, Error = SyncError> + Send + Unpin),
52 >,
53 ) -> Result<(), SyncError> {
54 debug!("DummyProtocol: initiate sync session");
55
56 let mut sink = into_cbor_sink(tx);
57 let mut stream = into_cbor_stream(rx);
58
59 sink.send(DummyProtocolMessage::TopicQuery(topic_query.clone()))
60 .await?;
61 sink.send(DummyProtocolMessage::Done).await?;
62 app_tx.send(FromSync::HandshakeSuccess(topic_query)).await?;
63
64 while let Some(result) = stream.next().await {
65 let message: DummyProtocolMessage = result?;
66 debug!("message received: {:?}", message);
67
68 match &message {
69 DummyProtocolMessage::TopicQuery(_) => panic!(),
70 DummyProtocolMessage::Done => break,
71 }
72 }
73
74 sink.flush().await?;
75 app_tx.flush().await?;
76
77 Ok(())
78 }
79
80 async fn accept(
81 self: Arc<Self>,
82 tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
83 rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
84 mut app_tx: Box<
85 &'a mut (dyn Sink<FromSync<SyncTestTopic>, Error = SyncError> + Send + Unpin),
86 >,
87 ) -> Result<(), SyncError> {
88 debug!("DummyProtocol: accept sync session");
89
90 let mut sink = into_cbor_sink(tx);
91 let mut stream = into_cbor_stream(rx);
92
93 while let Some(result) = stream.next().await {
94 let message: DummyProtocolMessage = result?;
95 debug!("message received: {:?}", message);
96
97 match &message {
98 DummyProtocolMessage::TopicQuery(topic_query) => {
99 app_tx
100 .send(FromSync::HandshakeSuccess(topic_query.clone()))
101 .await?
102 }
103 DummyProtocolMessage::Done => break,
104 }
105 }
106
107 sink.send(DummyProtocolMessage::Done).await?;
108
109 sink.flush().await?;
110 app_tx.flush().await?;
111
112 Ok(())
113 }
114}
115
116#[derive(Serialize, Deserialize)]
118enum PingPongProtocolMessage {
119 TopicQuery(SyncTestTopic),
120 Ping,
121 Pong,
122}
123
124#[derive(Debug, Clone)]
127pub struct PingPongProtocol {}
128
129#[async_trait]
130impl<'a> SyncProtocol<'a, SyncTestTopic> for PingPongProtocol {
131 fn name(&self) -> &'static str {
132 static SIMPLE_PROTOCOL_NAME: &str = "simple_protocol";
133 SIMPLE_PROTOCOL_NAME
134 }
135
136 async fn initiate(
137 self: Arc<Self>,
138 topic_query: SyncTestTopic,
139 tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
140 rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
141 mut app_tx: Box<
142 &'a mut (dyn Sink<FromSync<SyncTestTopic>, Error = SyncError> + Send + Unpin),
143 >,
144 ) -> Result<(), SyncError> {
145 debug!("initiate sync session");
146 let mut sink = into_cbor_sink(tx);
147 let mut stream = into_cbor_stream(rx);
148
149 sink.send(PingPongProtocolMessage::TopicQuery(topic_query.clone()))
150 .await?;
151 sink.send(PingPongProtocolMessage::Ping).await?;
152 debug!("ping message sent");
153
154 app_tx.send(FromSync::HandshakeSuccess(topic_query)).await?;
155
156 while let Some(result) = stream.next().await {
157 let message = result?;
158
159 match message {
160 PingPongProtocolMessage::TopicQuery(_) => panic!(),
161 PingPongProtocolMessage::Ping => {
162 return Err(SyncError::UnexpectedBehaviour(
163 "unexpected Ping message received".to_string(),
164 ));
165 }
166 PingPongProtocolMessage::Pong => {
167 debug!("pong message received");
168 app_tx
169 .send(FromSync::Data {
170 header: "PONG".as_bytes().to_owned(),
171 payload: None,
172 })
173 .await
174 .unwrap();
175 break;
176 }
177 }
178 }
179
180 sink.flush().await?;
182 app_tx.flush().await?;
183
184 Ok(())
185 }
186
187 async fn accept(
188 self: Arc<Self>,
189 tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
190 rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
191 mut app_tx: Box<
192 &'a mut (dyn Sink<FromSync<SyncTestTopic>, Error = SyncError> + Send + Unpin),
193 >,
194 ) -> Result<(), SyncError> {
195 debug!("accept sync session");
196 let mut sink = into_cbor_sink(tx);
197 let mut stream = into_cbor_stream(rx);
198
199 while let Some(result) = stream.next().await {
200 let message = result?;
201
202 match message {
203 PingPongProtocolMessage::TopicQuery(topic_query) => {
204 app_tx.send(FromSync::HandshakeSuccess(topic_query)).await?
205 }
206 PingPongProtocolMessage::Ping => {
207 debug!("ping message received");
208 app_tx
209 .send(FromSync::Data {
210 header: "PING".as_bytes().to_owned(),
211 payload: None,
212 })
213 .await
214 .unwrap();
215
216 sink.send(PingPongProtocolMessage::Pong).await?;
217 debug!("pong message sent");
218 break;
219 }
220 PingPongProtocolMessage::Pong => {
221 return Err(SyncError::UnexpectedBehaviour(
222 "unexpected Pong message received".to_string(),
223 ));
224 }
225 }
226 }
227
228 sink.flush().await?;
229 app_tx.flush().await?;
230
231 Ok(())
232 }
233}
234
235#[derive(Debug, Serialize, Deserialize)]
236enum FailingProtocolMessage {
237 TopicQuery(SyncTestTopic),
238 Done,
239}
240
241#[derive(Debug, Clone)]
243#[allow(dead_code)]
244pub enum FailingProtocol {
245 AcceptorFailsCritical,
248
249 InitiatorFailsCritical,
251
252 InitiatorFailsUnexpected,
255
256 InitiatorSendsTopicTwice,
259
260 AcceptorSendsTopic,
263
264 NoError,
266}
267
268#[async_trait]
269impl<'a> SyncProtocol<'a, SyncTestTopic> for FailingProtocol {
270 fn name(&self) -> &'static str {
271 "failing-protocol"
272 }
273
274 async fn initiate(
275 self: Arc<Self>,
276 topic: SyncTestTopic,
277 tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
278 rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
279 mut app_tx: Box<
280 &'a mut (dyn Sink<FromSync<SyncTestTopic>, Error = SyncError> + Send + Unpin),
281 >,
282 ) -> Result<(), SyncError> {
283 let mut sink = into_cbor_sink(tx);
284 let mut stream = into_cbor_stream(rx);
285
286 sink.send(FailingProtocolMessage::TopicQuery(topic.clone()))
287 .await?;
288
289 if let FailingProtocol::InitiatorSendsTopicTwice = *self {
291 sink.send(FailingProtocolMessage::TopicQuery(topic.clone()))
292 .await?;
293 }
294
295 if let FailingProtocol::InitiatorFailsCritical = *self {
297 return Err(SyncError::Critical(
298 "something really bad happened in the initiator".to_string(),
299 ));
300 }
301
302 if let FailingProtocol::InitiatorFailsUnexpected = *self {
304 return Err(SyncError::UnexpectedBehaviour("bang!".to_string()));
305 }
306
307 sink.send(FailingProtocolMessage::Done).await?;
308
309 app_tx.send(FromSync::HandshakeSuccess(topic)).await?;
310
311 while let Some(result) = stream.next().await {
312 let message: FailingProtocolMessage = result?;
313 match &message {
314 FailingProtocolMessage::TopicQuery(_) => {
315 return Err(SyncError::UnexpectedBehaviour(
316 "unexpected message received from acceptor".to_string(),
317 ));
318 }
319 FailingProtocolMessage::Done => break,
320 }
321 }
322
323 Ok(())
324 }
325
326 async fn accept(
327 self: Arc<Self>,
328 tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
329 rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
330 mut app_tx: Box<
331 &'a mut (dyn Sink<FromSync<SyncTestTopic>, Error = SyncError> + Send + Unpin),
332 >,
333 ) -> Result<(), SyncError> {
334 if let FailingProtocol::AcceptorFailsCritical = *self {
336 return Err(SyncError::Critical(
337 "something really bad happened in the acceptor".to_string(),
338 ));
339 }
340
341 let mut sink = into_cbor_sink(tx);
342 let mut stream = into_cbor_stream(rx);
343
344 if let FailingProtocol::AcceptorSendsTopic = *self {
347 let topic = SyncTestTopic::new("unexpected behaviour test");
348 sink.send(FailingProtocolMessage::TopicQuery(topic)).await?;
349 }
350
351 let mut received_topic = false;
352
353 while let Some(result) = stream.next().await {
354 let message: FailingProtocolMessage = result?;
355 match &message {
356 FailingProtocolMessage::TopicQuery(topic) => {
357 if !received_topic {
358 app_tx
359 .send(FromSync::HandshakeSuccess(topic.clone()))
360 .await?;
361 received_topic = true;
362 } else {
363 return Err(SyncError::UnexpectedBehaviour(
364 "received topic too often".to_string(),
365 ));
366 }
367 }
368 FailingProtocolMessage::Done => break,
369 }
370 }
371
372 sink.send(FailingProtocolMessage::Done).await?;
373
374 Ok(())
375 }
376}