arc_malachitebft_engine/util/
streaming.rs1use core::fmt;
2
3use bytes::Bytes;
4
5pub type Sequence = u64;
6
7#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
8pub struct StreamId(pub(crate) Bytes);
9
10impl StreamId {
11 pub fn new(bytes: Bytes) -> Self {
12 Self(bytes)
13 }
14
15 pub fn to_bytes(&self) -> Bytes {
16 self.0.clone()
17 }
18}
19
20impl fmt::Display for StreamId {
21 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
22 for byte in &self.0 {
23 write!(f, "{byte:02x}")?;
24 }
25 Ok(())
26 }
27}
28
29#[derive(Clone, Debug, PartialEq, Eq)]
30#[cfg_attr(
31 feature = "borsh",
32 derive(::borsh::BorshSerialize, ::borsh::BorshDeserialize)
33)]
34pub struct StreamMessage<T> {
35 pub stream_id: StreamId,
39
40 pub sequence: Sequence,
42
43 pub content: StreamContent<T>,
45}
46
47impl<T> StreamMessage<T> {
48 pub fn new(stream_id: StreamId, sequence: Sequence, content: StreamContent<T>) -> Self {
49 Self {
50 stream_id,
51 sequence,
52 content,
53 }
54 }
55
56 pub fn is_first(&self) -> bool {
57 self.sequence == 0
58 }
59
60 pub fn is_fin(&self) -> bool {
61 self.content.is_fin()
62 }
63}
64
65#[derive(Clone, Debug, PartialEq, Eq)]
66#[cfg_attr(
67 feature = "borsh",
68 derive(borsh::BorshSerialize, borsh::BorshDeserialize)
69)]
70pub enum StreamContent<T> {
71 Data(T),
73
74 Fin,
76}
77
78impl<T> StreamContent<T> {
79 pub fn as_data(&self) -> Option<&T> {
80 match self {
81 Self::Data(data) => Some(data),
82 _ => None,
83 }
84 }
85
86 pub fn into_data(self) -> Option<T> {
87 match self {
88 Self::Data(data) => Some(data),
89 _ => None,
90 }
91 }
92
93 pub fn is_fin(&self) -> bool {
94 matches!(self, Self::Fin)
95 }
96}