pallas_multiplexer/
sync.rs

1use crate::{
2    agents::{self, ChannelBuffer},
3    bearers::{Bearer, Segment},
4    Payload,
5};
6
7use std::time::Instant;
8
9pub struct SyncPlexer {
10    bearer: Bearer,
11    protocol: u16,
12    clock: Instant,
13}
14
15impl SyncPlexer {
16    pub fn new(bearer: Bearer, protocol: u16) -> Self {
17        Self {
18            bearer,
19            protocol,
20            clock: Instant::now(),
21        }
22    }
23
24    pub fn unwrap(self) -> Bearer {
25        self.bearer
26    }
27}
28
29pub type SyncChannel = ChannelBuffer<SyncPlexer>;
30
31impl agents::Channel for SyncPlexer {
32    fn enqueue_chunk(&mut self, payload: Payload) -> Result<(), agents::ChannelError> {
33        let segment = Segment::new(self.clock, self.protocol, payload);
34
35        self.bearer
36            .write_segment(segment)
37            .map_err(|_| agents::ChannelError::NotConnected(None))
38    }
39
40    fn dequeue_chunk(&mut self) -> Result<Payload, agents::ChannelError> {
41        match self.bearer.read_segment() {
42            Ok(segment) => match segment {
43                Some(x) => {
44                    assert_eq!(
45                        x.protocol, self.protocol,
46                        "sync plexer received payload for wrong protocol"
47                    );
48                    Ok(x.payload)
49                }
50                None => Err(agents::ChannelError::NotConnected(None)),
51            },
52            Err(_) => Err(agents::ChannelError::NotConnected(None)),
53        }
54    }
55}