Skip to main content

arc_malachitebft_engine/util/
streaming.rs

1use core::fmt;
2
3use bytes::Bytes;
4
5pub type Sequence = u64;
6
7#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
8pub struct StreamId(pub(crate) Bytes);
9
10impl StreamId {
11    pub fn new(bytes: Bytes) -> Self {
12        Self(bytes)
13    }
14
15    pub fn to_bytes(&self) -> Bytes {
16        self.0.clone()
17    }
18}
19
20impl fmt::Display for StreamId {
21    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
22        for byte in &self.0 {
23            write!(f, "{byte:02x}")?;
24        }
25        Ok(())
26    }
27}
28
29#[derive(Clone, Debug, PartialEq, Eq)]
30#[cfg_attr(
31    feature = "borsh",
32    derive(::borsh::BorshSerialize, ::borsh::BorshDeserialize)
33)]
34pub struct StreamMessage<T> {
35    /// Receivers identify streams by (sender, stream_id).
36    /// This means each node can allocate stream_ids independently
37    /// and that many streams can be sent on a single network topic.
38    pub stream_id: StreamId,
39
40    /// Identifies the sequence of each message in the stream starting from 0.
41    pub sequence: Sequence,
42
43    /// The content of this stream message
44    pub content: StreamContent<T>,
45}
46
47impl<T> StreamMessage<T> {
48    pub fn new(stream_id: StreamId, sequence: Sequence, content: StreamContent<T>) -> Self {
49        Self {
50            stream_id,
51            sequence,
52            content,
53        }
54    }
55
56    pub fn is_first(&self) -> bool {
57        self.sequence == 0
58    }
59
60    pub fn is_fin(&self) -> bool {
61        self.content.is_fin()
62    }
63}
64
65#[derive(Clone, Debug, PartialEq, Eq)]
66#[cfg_attr(
67    feature = "borsh",
68    derive(borsh::BorshSerialize, borsh::BorshDeserialize)
69)]
70pub enum StreamContent<T> {
71    /// Serialized content.
72    Data(T),
73
74    /// Indicates the end of the stream.
75    Fin,
76}
77
78impl<T> StreamContent<T> {
79    pub fn as_data(&self) -> Option<&T> {
80        match self {
81            Self::Data(data) => Some(data),
82            _ => None,
83        }
84    }
85
86    pub fn into_data(self) -> Option<T> {
87        match self {
88            Self::Data(data) => Some(data),
89            _ => None,
90        }
91    }
92
93    pub fn is_fin(&self) -> bool {
94        matches!(self, Self::Fin)
95    }
96}