fluxion_core/
stream_item.rs

1// Copyright 2025 Umberto Gotti <umberto.gotti@umbertogotti.dev>
2// Licensed under the Apache License, Version 2.0
3// http://www.apache.org/licenses/LICENSE-2.0
4
5use crate::{fluxion_error::FluxionError, HasTimestamp, Timestamped};
6use core::cmp::Ordering;
7
8/// A stream item that can be either a value or an error.
9///
10/// This enum allows operators to naturally propagate errors through the stream
11/// while processing values, following Rx-style error semantics where errors
12/// are propagated and do NOT terminate the stream
13#[derive(Debug, Clone)]
14pub enum StreamItem<T> {
15    /// A successful value
16    Value(T),
17    /// An error that does not terminate the stream (it only gets propagated)
18    Error(FluxionError),
19}
20
21impl<T: PartialEq> PartialEq for StreamItem<T> {
22    fn eq(&self, other: &Self) -> bool {
23        match (self, other) {
24            (StreamItem::Value(a), StreamItem::Value(b)) => a == b,
25            _ => false, // Errors are never equal
26        }
27    }
28}
29
30impl<T: Eq> Eq for StreamItem<T> {}
31
32impl<T: PartialOrd> PartialOrd for StreamItem<T> {
33    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
34        match (self, other) {
35            (StreamItem::Value(a), StreamItem::Value(b)) => a.partial_cmp(b),
36            (StreamItem::Error(_), StreamItem::Error(_)) => Some(Ordering::Equal),
37            (StreamItem::Error(_), StreamItem::Value(_)) => Some(Ordering::Less),
38            (StreamItem::Value(_), StreamItem::Error(_)) => Some(Ordering::Greater),
39        }
40    }
41}
42
43impl<T: Ord> Ord for StreamItem<T> {
44    fn cmp(&self, other: &Self) -> Ordering {
45        match (self, other) {
46            (StreamItem::Value(a), StreamItem::Value(b)) => a.cmp(b),
47            (StreamItem::Error(_), StreamItem::Error(_)) => Ordering::Equal,
48            (StreamItem::Error(_), StreamItem::Value(_)) => Ordering::Less,
49            (StreamItem::Value(_), StreamItem::Error(_)) => Ordering::Greater,
50        }
51    }
52}
53
54impl<T> StreamItem<T> {
55    /// Returns `true` if this is a `Value`.
56    pub const fn is_value(&self) -> bool {
57        matches!(self, StreamItem::Value(_))
58    }
59
60    /// Returns `true` if this is an `Error`.
61    pub const fn is_error(&self) -> bool {
62        matches!(self, StreamItem::Error(_))
63    }
64
65    /// Converts from `StreamItem<T>` to `Option<T>`, discarding errors.
66    pub fn ok(self) -> Option<T> {
67        match self {
68            StreamItem::Value(v) => Some(v),
69            StreamItem::Error(_) => None,
70        }
71    }
72
73    /// Converts from `StreamItem<T>` to `Option<FluxionError>`, discarding values.
74    pub fn err(self) -> Option<FluxionError> {
75        match self {
76            StreamItem::Value(_) => None,
77            StreamItem::Error(e) => Some(e),
78        }
79    }
80
81    /// Maps a `StreamItem<T>` to `StreamItem<U>` by applying a function to the contained value.
82    ///
83    /// Errors are propagated unchanged.
84    pub fn map<U, F>(self, f: F) -> StreamItem<U>
85    where
86        F: FnOnce(T) -> U,
87    {
88        match self {
89            StreamItem::Value(v) => StreamItem::Value(f(v)),
90            StreamItem::Error(e) => StreamItem::Error(e),
91        }
92    }
93
94    /// Maps a `StreamItem<T>` to `StreamItem<U>` by applying a function that can fail.
95    ///
96    /// Errors are propagated unchanged.
97    pub fn and_then<U, F>(self, f: F) -> StreamItem<U>
98    where
99        F: FnOnce(T) -> StreamItem<U>,
100    {
101        match self {
102            StreamItem::Value(v) => f(v),
103            StreamItem::Error(e) => StreamItem::Error(e),
104        }
105    }
106
107    /// Returns the contained value, panicking if it's an error.
108    ///
109    /// # Panics
110    ///
111    /// Panics if the item is an `Error`.
112    pub fn unwrap(self) -> T
113    where
114        FluxionError: core::fmt::Debug,
115    {
116        match self {
117            StreamItem::Value(v) => v,
118            StreamItem::Error(e) => {
119                panic!("called `StreamItem::unwrap()` on an `Error` value: {:?}", e)
120            }
121        }
122    }
123
124    /// Returns the contained value, panicking with a custom message if it's an error.
125    ///
126    /// # Panics
127    ///
128    /// Panics with the provided message if the item is an `Error`.
129    pub fn expect(self, msg: &str) -> T
130    where
131        FluxionError: core::fmt::Debug,
132    {
133        match self {
134            StreamItem::Value(v) => v,
135            StreamItem::Error(e) => panic!("{}: {:?}", msg, e),
136        }
137    }
138}
139
140impl<T> From<Result<T, FluxionError>> for StreamItem<T> {
141    fn from(result: Result<T, FluxionError>) -> Self {
142        match result {
143            Ok(v) => StreamItem::Value(v),
144            Err(e) => StreamItem::Error(e),
145        }
146    }
147}
148
149impl<T> From<StreamItem<T>> for Result<T, FluxionError> {
150    fn from(item: StreamItem<T>) -> Self {
151        match item {
152            StreamItem::Value(v) => Ok(v),
153            StreamItem::Error(e) => Err(e),
154        }
155    }
156}
157
158impl<T> HasTimestamp for StreamItem<T>
159where
160    T: Timestamped,
161{
162    type Timestamp = T::Timestamp;
163
164    fn timestamp(&self) -> Self::Timestamp {
165        match self {
166            StreamItem::Value(v) => v.timestamp(),
167            StreamItem::Error(_) => panic!("called `timestamp()` on StreamItem::Error"),
168        }
169    }
170}
171
172impl<T> Timestamped for StreamItem<T>
173where
174    T: Timestamped,
175{
176    type Inner = T::Inner;
177
178    fn with_timestamp(value: Self::Inner, timestamp: Self::Timestamp) -> Self {
179        StreamItem::Value(T::with_timestamp(value, timestamp))
180    }
181
182    fn into_inner(self) -> Self::Inner {
183        match self {
184            StreamItem::Value(v) => v.into_inner(),
185            StreamItem::Error(_) => panic!("called `into_inner()` on StreamItem::Error"),
186        }
187    }
188}