p2panda_sync/
test_protocols.rs

1//! Sync protocol implementations for testing purposes.
2//!
3//! - `DummyProtocol`
4//! - `PingPongProtocol`
5//! - `FailingProtocol`
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use futures_lite::{AsyncRead, AsyncWrite, StreamExt};
10use futures_util::{Sink, SinkExt};
11use serde::{Deserialize, Serialize};
12use tracing::debug;
13
14use crate::cbor::{into_cbor_sink, into_cbor_stream};
15use crate::{FromSync, SyncError, SyncProtocol, TopicQuery};
16
17#[derive(Clone, Debug, PartialEq, Eq, Hash, Deserialize, Serialize)]
18pub struct SyncTestTopic(String, pub [u8; 32]);
19
20impl SyncTestTopic {
21    pub fn new(name: &str) -> Self {
22        Self(name.to_owned(), [0; 32])
23    }
24}
25
26impl TopicQuery for SyncTestTopic {}
27
28#[derive(Debug, Serialize, Deserialize)]
29enum DummyProtocolMessage {
30    TopicQuery(SyncTestTopic),
31    Done,
32}
33
34/// A sync implementation which fulfills basic protocol requirements but nothing more
35#[derive(Debug)]
36pub struct DummyProtocol {}
37
38#[async_trait]
39impl<'a> SyncProtocol<'a, SyncTestTopic> for DummyProtocol {
40    fn name(&self) -> &'static str {
41        static DUMMY_PROTOCOL_NAME: &str = "dummy_protocol";
42        DUMMY_PROTOCOL_NAME
43    }
44
45    async fn initiate(
46        self: Arc<Self>,
47        topic_query: SyncTestTopic,
48        tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
49        rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
50        mut app_tx: Box<
51            &'a mut (dyn Sink<FromSync<SyncTestTopic>, Error = SyncError> + Send + Unpin),
52        >,
53    ) -> Result<(), SyncError> {
54        debug!("DummyProtocol: initiate sync session");
55
56        let mut sink = into_cbor_sink(tx);
57        let mut stream = into_cbor_stream(rx);
58
59        sink.send(DummyProtocolMessage::TopicQuery(topic_query.clone()))
60            .await?;
61        sink.send(DummyProtocolMessage::Done).await?;
62        app_tx.send(FromSync::HandshakeSuccess(topic_query)).await?;
63
64        while let Some(result) = stream.next().await {
65            let message: DummyProtocolMessage = result?;
66            debug!("message received: {:?}", message);
67
68            match &message {
69                DummyProtocolMessage::TopicQuery(_) => panic!(),
70                DummyProtocolMessage::Done => break,
71            }
72        }
73
74        sink.flush().await?;
75        app_tx.flush().await?;
76
77        Ok(())
78    }
79
80    async fn accept(
81        self: Arc<Self>,
82        tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
83        rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
84        mut app_tx: Box<
85            &'a mut (dyn Sink<FromSync<SyncTestTopic>, Error = SyncError> + Send + Unpin),
86        >,
87    ) -> Result<(), SyncError> {
88        debug!("DummyProtocol: accept sync session");
89
90        let mut sink = into_cbor_sink(tx);
91        let mut stream = into_cbor_stream(rx);
92
93        while let Some(result) = stream.next().await {
94            let message: DummyProtocolMessage = result?;
95            debug!("message received: {:?}", message);
96
97            match &message {
98                DummyProtocolMessage::TopicQuery(topic_query) => {
99                    app_tx
100                        .send(FromSync::HandshakeSuccess(topic_query.clone()))
101                        .await?
102                }
103                DummyProtocolMessage::Done => break,
104            }
105        }
106
107        sink.send(DummyProtocolMessage::Done).await?;
108
109        sink.flush().await?;
110        app_tx.flush().await?;
111
112        Ok(())
113    }
114}
115
116// The protocol message types.
117#[derive(Serialize, Deserialize)]
118enum PingPongProtocolMessage {
119    TopicQuery(SyncTestTopic),
120    Ping,
121    Pong,
122}
123
124/// A sync implementation where the initiator sends a `ping` message and the acceptor responds with
125/// a `pong` message.
126#[derive(Debug, Clone)]
127pub struct PingPongProtocol {}
128
129#[async_trait]
130impl<'a> SyncProtocol<'a, SyncTestTopic> for PingPongProtocol {
131    fn name(&self) -> &'static str {
132        static SIMPLE_PROTOCOL_NAME: &str = "simple_protocol";
133        SIMPLE_PROTOCOL_NAME
134    }
135
136    async fn initiate(
137        self: Arc<Self>,
138        topic_query: SyncTestTopic,
139        tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
140        rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
141        mut app_tx: Box<
142            &'a mut (dyn Sink<FromSync<SyncTestTopic>, Error = SyncError> + Send + Unpin),
143        >,
144    ) -> Result<(), SyncError> {
145        debug!("initiate sync session");
146        let mut sink = into_cbor_sink(tx);
147        let mut stream = into_cbor_stream(rx);
148
149        sink.send(PingPongProtocolMessage::TopicQuery(topic_query.clone()))
150            .await?;
151        sink.send(PingPongProtocolMessage::Ping).await?;
152        debug!("ping message sent");
153
154        app_tx.send(FromSync::HandshakeSuccess(topic_query)).await?;
155
156        while let Some(result) = stream.next().await {
157            let message = result?;
158
159            match message {
160                PingPongProtocolMessage::TopicQuery(_) => panic!(),
161                PingPongProtocolMessage::Ping => {
162                    return Err(SyncError::UnexpectedBehaviour(
163                        "unexpected Ping message received".to_string(),
164                    ));
165                }
166                PingPongProtocolMessage::Pong => {
167                    debug!("pong message received");
168                    app_tx
169                        .send(FromSync::Data {
170                            header: "PONG".as_bytes().to_owned(),
171                            payload: None,
172                        })
173                        .await
174                        .unwrap();
175                    break;
176                }
177            }
178        }
179
180        // Flush all bytes so that no messages are lost.
181        sink.flush().await?;
182        app_tx.flush().await?;
183
184        Ok(())
185    }
186
187    async fn accept(
188        self: Arc<Self>,
189        tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
190        rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
191        mut app_tx: Box<
192            &'a mut (dyn Sink<FromSync<SyncTestTopic>, Error = SyncError> + Send + Unpin),
193        >,
194    ) -> Result<(), SyncError> {
195        debug!("accept sync session");
196        let mut sink = into_cbor_sink(tx);
197        let mut stream = into_cbor_stream(rx);
198
199        while let Some(result) = stream.next().await {
200            let message = result?;
201
202            match message {
203                PingPongProtocolMessage::TopicQuery(topic_query) => {
204                    app_tx.send(FromSync::HandshakeSuccess(topic_query)).await?
205                }
206                PingPongProtocolMessage::Ping => {
207                    debug!("ping message received");
208                    app_tx
209                        .send(FromSync::Data {
210                            header: "PING".as_bytes().to_owned(),
211                            payload: None,
212                        })
213                        .await
214                        .unwrap();
215
216                    sink.send(PingPongProtocolMessage::Pong).await?;
217                    debug!("pong message sent");
218                    break;
219                }
220                PingPongProtocolMessage::Pong => {
221                    return Err(SyncError::UnexpectedBehaviour(
222                        "unexpected Pong message received".to_string(),
223                    ));
224                }
225            }
226        }
227
228        sink.flush().await?;
229        app_tx.flush().await?;
230
231        Ok(())
232    }
233}
234
235#[derive(Debug, Serialize, Deserialize)]
236enum FailingProtocolMessage {
237    TopicQuery(SyncTestTopic),
238    Done,
239}
240
241/// A sync implementation which returns a mocked error.
242#[derive(Debug, Clone)]
243#[allow(dead_code)]
244pub enum FailingProtocol {
245    /// A critical error is triggered inside `accept()` after sync messages have been
246    /// exchanged.
247    AcceptorFailsCritical,
248
249    /// A critical error is triggered inside `initiate()` after the handshake is complete.
250    InitiatorFailsCritical,
251
252    /// An unexpected behaviour error is triggered inside `initiate()` after the topic query has
253    /// been sent.
254    InitiatorFailsUnexpected,
255
256    /// An unexpected behaviour error is triggered inside `accept()` by sending the topic twice
257    /// from `initiate()`.
258    InitiatorSendsTopicTwice,
259
260    /// An unexpected behaviour error is triggered inside `initiate()` by sending a topic from
261    /// `accept()`.
262    AcceptorSendsTopic,
263
264    /// No errors are explicitly triggered; used for "happy path" test.
265    NoError,
266}
267
268#[async_trait]
269impl<'a> SyncProtocol<'a, SyncTestTopic> for FailingProtocol {
270    fn name(&self) -> &'static str {
271        "failing-protocol"
272    }
273
274    async fn initiate(
275        self: Arc<Self>,
276        topic: SyncTestTopic,
277        tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
278        rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
279        mut app_tx: Box<
280            &'a mut (dyn Sink<FromSync<SyncTestTopic>, Error = SyncError> + Send + Unpin),
281        >,
282    ) -> Result<(), SyncError> {
283        let mut sink = into_cbor_sink(tx);
284        let mut stream = into_cbor_stream(rx);
285
286        sink.send(FailingProtocolMessage::TopicQuery(topic.clone()))
287            .await?;
288
289        // Simulate critical sync implementation bug by sending the topic a second time.
290        if let FailingProtocol::InitiatorSendsTopicTwice = *self {
291            sink.send(FailingProtocolMessage::TopicQuery(topic.clone()))
292                .await?;
293        }
294
295        // Simulate some critical error which occurred inside the sync session.
296        if let FailingProtocol::InitiatorFailsCritical = *self {
297            return Err(SyncError::Critical(
298                "something really bad happened in the initiator".to_string(),
299            ));
300        }
301
302        // Simulate unexpected behaviour (such as a broken pipe due to disconnection).
303        if let FailingProtocol::InitiatorFailsUnexpected = *self {
304            return Err(SyncError::UnexpectedBehaviour("bang!".to_string()));
305        }
306
307        sink.send(FailingProtocolMessage::Done).await?;
308
309        app_tx.send(FromSync::HandshakeSuccess(topic)).await?;
310
311        while let Some(result) = stream.next().await {
312            let message: FailingProtocolMessage = result?;
313            match &message {
314                FailingProtocolMessage::TopicQuery(_) => {
315                    return Err(SyncError::UnexpectedBehaviour(
316                        "unexpected message received from acceptor".to_string(),
317                    ));
318                }
319                FailingProtocolMessage::Done => break,
320            }
321        }
322
323        Ok(())
324    }
325
326    async fn accept(
327        self: Arc<Self>,
328        tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
329        rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
330        mut app_tx: Box<
331            &'a mut (dyn Sink<FromSync<SyncTestTopic>, Error = SyncError> + Send + Unpin),
332        >,
333    ) -> Result<(), SyncError> {
334        // Simulate some critical error which occurred inside the sync session.
335        if let FailingProtocol::AcceptorFailsCritical = *self {
336            return Err(SyncError::Critical(
337                "something really bad happened in the acceptor".to_string(),
338            ));
339        }
340
341        let mut sink = into_cbor_sink(tx);
342        let mut stream = into_cbor_stream(rx);
343
344        // Simulate critical sync implementation bug by sending the topic from the acceptor (it
345        // _never_ sends any topics).
346        if let FailingProtocol::AcceptorSendsTopic = *self {
347            let topic = SyncTestTopic::new("unexpected behaviour test");
348            sink.send(FailingProtocolMessage::TopicQuery(topic)).await?;
349        }
350
351        let mut received_topic = false;
352
353        while let Some(result) = stream.next().await {
354            let message: FailingProtocolMessage = result?;
355            match &message {
356                FailingProtocolMessage::TopicQuery(topic) => {
357                    if !received_topic {
358                        app_tx
359                            .send(FromSync::HandshakeSuccess(topic.clone()))
360                            .await?;
361                        received_topic = true;
362                    } else {
363                        return Err(SyncError::UnexpectedBehaviour(
364                            "received topic too often".to_string(),
365                        ));
366                    }
367                }
368                FailingProtocolMessage::Done => break,
369            }
370        }
371
372        sink.send(FailingProtocolMessage::Done).await?;
373
374        Ok(())
375    }
376}