pallas_multiplexer/
sync.rs1use crate::{
2 agents::{self, ChannelBuffer},
3 bearers::{Bearer, Segment},
4 Payload,
5};
6
7use std::time::Instant;
8
9pub struct SyncPlexer {
10 bearer: Bearer,
11 protocol: u16,
12 clock: Instant,
13}
14
15impl SyncPlexer {
16 pub fn new(bearer: Bearer, protocol: u16) -> Self {
17 Self {
18 bearer,
19 protocol,
20 clock: Instant::now(),
21 }
22 }
23
24 pub fn unwrap(self) -> Bearer {
25 self.bearer
26 }
27}
28
29pub type SyncChannel = ChannelBuffer<SyncPlexer>;
30
31impl agents::Channel for SyncPlexer {
32 fn enqueue_chunk(&mut self, payload: Payload) -> Result<(), agents::ChannelError> {
33 let segment = Segment::new(self.clock, self.protocol, payload);
34
35 self.bearer
36 .write_segment(segment)
37 .map_err(|_| agents::ChannelError::NotConnected(None))
38 }
39
40 fn dequeue_chunk(&mut self) -> Result<Payload, agents::ChannelError> {
41 match self.bearer.read_segment() {
42 Ok(segment) => match segment {
43 Some(x) => {
44 assert_eq!(
45 x.protocol, self.protocol,
46 "sync plexer received payload for wrong protocol"
47 );
48 Ok(x.payload)
49 }
50 None => Err(agents::ChannelError::NotConnected(None)),
51 },
52 Err(_) => Err(agents::ChannelError::NotConnected(None)),
53 }
54 }
55}