pallas_multiplexer/
demux.rs

1use std::collections::HashMap;
2
3use crate::{bearers::Bearer, Payload};
4
5pub struct EgressError(pub Payload);
6
7pub trait Egress {
8    fn send(&mut self, payload: Payload) -> Result<(), EgressError>;
9}
10
11pub enum DemuxError {
12    BearerError(std::io::Error),
13    EgressDisconnected(u16, Payload),
14    EgressUnknown(u16, Payload),
15}
16
17pub enum TickOutcome {
18    Busy,
19    Idle,
20}
21
22/// A demuxer that reads from a bearer into the corresponding egress
23pub struct Demuxer<E> {
24    bearer: Bearer,
25    egress: HashMap<u16, E>,
26}
27
28impl<E> Demuxer<E>
29where
30    E: Egress,
31{
32    pub fn new(bearer: Bearer) -> Self {
33        Demuxer {
34            bearer,
35            egress: Default::default(),
36        }
37    }
38
39    pub fn register(&mut self, id: u16, tx: E) {
40        self.egress.insert(id, tx);
41    }
42
43    pub fn unregister(&mut self, id: u16) -> Option<E> {
44        self.egress.remove(&id)
45    }
46
47    fn dispatch(&mut self, protocol: u16, payload: Payload) -> Result<(), DemuxError> {
48        match self.egress.get_mut(&protocol) {
49            Some(tx) => match tx.send(payload) {
50                Err(EgressError(p)) => Err(DemuxError::EgressDisconnected(protocol, p)),
51                Ok(_) => Ok(()),
52            },
53            None => Err(DemuxError::EgressUnknown(protocol, payload)),
54        }
55    }
56
57    pub fn tick(&mut self) -> Result<TickOutcome, DemuxError> {
58        match self.bearer.read_segment() {
59            Err(err) => Err(DemuxError::BearerError(err)),
60            Ok(None) => Ok(TickOutcome::Idle),
61            Ok(Some(segment)) => match self.dispatch(segment.protocol, segment.payload) {
62                Err(err) => Err(err),
63                Ok(()) => Ok(TickOutcome::Busy),
64            },
65        }
66    }
67}