fluxion_core/
stream_item.rs1use crate::{fluxion_error::FluxionError, HasTimestamp, Timestamped};
6use core::cmp::Ordering;
7
8#[derive(Debug, Clone)]
14pub enum StreamItem<T> {
15 Value(T),
17 Error(FluxionError),
19}
20
21impl<T: PartialEq> PartialEq for StreamItem<T> {
22 fn eq(&self, other: &Self) -> bool {
23 match (self, other) {
24 (StreamItem::Value(a), StreamItem::Value(b)) => a == b,
25 _ => false, }
27 }
28}
29
30impl<T: Eq> Eq for StreamItem<T> {}
31
32impl<T: PartialOrd> PartialOrd for StreamItem<T> {
33 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
34 match (self, other) {
35 (StreamItem::Value(a), StreamItem::Value(b)) => a.partial_cmp(b),
36 (StreamItem::Error(_), StreamItem::Error(_)) => Some(Ordering::Equal),
37 (StreamItem::Error(_), StreamItem::Value(_)) => Some(Ordering::Less),
38 (StreamItem::Value(_), StreamItem::Error(_)) => Some(Ordering::Greater),
39 }
40 }
41}
42
43impl<T: Ord> Ord for StreamItem<T> {
44 fn cmp(&self, other: &Self) -> Ordering {
45 match (self, other) {
46 (StreamItem::Value(a), StreamItem::Value(b)) => a.cmp(b),
47 (StreamItem::Error(_), StreamItem::Error(_)) => Ordering::Equal,
48 (StreamItem::Error(_), StreamItem::Value(_)) => Ordering::Less,
49 (StreamItem::Value(_), StreamItem::Error(_)) => Ordering::Greater,
50 }
51 }
52}
53
54impl<T> StreamItem<T> {
55 pub const fn is_value(&self) -> bool {
57 matches!(self, StreamItem::Value(_))
58 }
59
60 pub const fn is_error(&self) -> bool {
62 matches!(self, StreamItem::Error(_))
63 }
64
65 pub fn ok(self) -> Option<T> {
67 match self {
68 StreamItem::Value(v) => Some(v),
69 StreamItem::Error(_) => None,
70 }
71 }
72
73 pub fn err(self) -> Option<FluxionError> {
75 match self {
76 StreamItem::Value(_) => None,
77 StreamItem::Error(e) => Some(e),
78 }
79 }
80
81 pub fn map<U, F>(self, f: F) -> StreamItem<U>
85 where
86 F: FnOnce(T) -> U,
87 {
88 match self {
89 StreamItem::Value(v) => StreamItem::Value(f(v)),
90 StreamItem::Error(e) => StreamItem::Error(e),
91 }
92 }
93
94 pub fn and_then<U, F>(self, f: F) -> StreamItem<U>
98 where
99 F: FnOnce(T) -> StreamItem<U>,
100 {
101 match self {
102 StreamItem::Value(v) => f(v),
103 StreamItem::Error(e) => StreamItem::Error(e),
104 }
105 }
106
107 pub fn unwrap(self) -> T
113 where
114 FluxionError: core::fmt::Debug,
115 {
116 match self {
117 StreamItem::Value(v) => v,
118 StreamItem::Error(e) => {
119 panic!("called `StreamItem::unwrap()` on an `Error` value: {:?}", e)
120 }
121 }
122 }
123
124 pub fn expect(self, msg: &str) -> T
130 where
131 FluxionError: core::fmt::Debug,
132 {
133 match self {
134 StreamItem::Value(v) => v,
135 StreamItem::Error(e) => panic!("{}: {:?}", msg, e),
136 }
137 }
138}
139
140impl<T> From<Result<T, FluxionError>> for StreamItem<T> {
141 fn from(result: Result<T, FluxionError>) -> Self {
142 match result {
143 Ok(v) => StreamItem::Value(v),
144 Err(e) => StreamItem::Error(e),
145 }
146 }
147}
148
149impl<T> From<StreamItem<T>> for Result<T, FluxionError> {
150 fn from(item: StreamItem<T>) -> Self {
151 match item {
152 StreamItem::Value(v) => Ok(v),
153 StreamItem::Error(e) => Err(e),
154 }
155 }
156}
157
158impl<T> HasTimestamp for StreamItem<T>
159where
160 T: Timestamped,
161{
162 type Timestamp = T::Timestamp;
163
164 fn timestamp(&self) -> Self::Timestamp {
165 match self {
166 StreamItem::Value(v) => v.timestamp(),
167 StreamItem::Error(_) => panic!("called `timestamp()` on StreamItem::Error"),
168 }
169 }
170}
171
172impl<T> Timestamped for StreamItem<T>
173where
174 T: Timestamped,
175{
176 type Inner = T::Inner;
177
178 fn with_timestamp(value: Self::Inner, timestamp: Self::Timestamp) -> Self {
179 StreamItem::Value(T::with_timestamp(value, timestamp))
180 }
181
182 fn into_inner(self) -> Self::Inner {
183 match self {
184 StreamItem::Value(v) => v.into_inner(),
185 StreamItem::Error(_) => panic!("called `into_inner()` on StreamItem::Error"),
186 }
187 }
188}