pallas_multiplexer/
demux.rs1use std::collections::HashMap;
2
3use crate::{bearers::Bearer, Payload};
4
5pub struct EgressError(pub Payload);
6
7pub trait Egress {
8 fn send(&mut self, payload: Payload) -> Result<(), EgressError>;
9}
10
11pub enum DemuxError {
12 BearerError(std::io::Error),
13 EgressDisconnected(u16, Payload),
14 EgressUnknown(u16, Payload),
15}
16
17pub enum TickOutcome {
18 Busy,
19 Idle,
20}
21
22pub struct Demuxer<E> {
24 bearer: Bearer,
25 egress: HashMap<u16, E>,
26}
27
28impl<E> Demuxer<E>
29where
30 E: Egress,
31{
32 pub fn new(bearer: Bearer) -> Self {
33 Demuxer {
34 bearer,
35 egress: Default::default(),
36 }
37 }
38
39 pub fn register(&mut self, id: u16, tx: E) {
40 self.egress.insert(id, tx);
41 }
42
43 pub fn unregister(&mut self, id: u16) -> Option<E> {
44 self.egress.remove(&id)
45 }
46
47 fn dispatch(&mut self, protocol: u16, payload: Payload) -> Result<(), DemuxError> {
48 match self.egress.get_mut(&protocol) {
49 Some(tx) => match tx.send(payload) {
50 Err(EgressError(p)) => Err(DemuxError::EgressDisconnected(protocol, p)),
51 Ok(_) => Ok(()),
52 },
53 None => Err(DemuxError::EgressUnknown(protocol, payload)),
54 }
55 }
56
57 pub fn tick(&mut self) -> Result<TickOutcome, DemuxError> {
58 match self.bearer.read_segment() {
59 Err(err) => Err(DemuxError::BearerError(err)),
60 Ok(None) => Ok(TickOutcome::Idle),
61 Ok(Some(segment)) => match self.dispatch(segment.protocol, segment.payload) {
62 Err(err) => Err(err),
63 Ok(()) => Ok(TickOutcome::Busy),
64 },
65 }
66 }
67}