Skip to main content

kithara_audio/pipeline/
fetch.rs

1/// Kind of fetch marker on the wire.
2#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3pub enum FetchKind {
4    /// Normal PCM payload.
5    Data,
6    /// Natural end-of-stream — the track played out to its duration.
7    NaturalEof,
8    /// Decoder / source failure. Transient during seek recovery or a
9    /// genuine unrecoverable error; never conflate with `NaturalEof`.
10    Failure,
11}
12
13/// Fetch result from a worker source.
14#[derive(Debug, Clone)]
15pub struct Fetch<C> {
16    /// The data chunk (ignored for non-`Data` kinds).
17    pub data: C,
18    /// Marker kind.
19    pub kind: FetchKind,
20    /// Epoch for seek invalidation (0 if unused).
21    pub epoch: u64,
22}
23
24impl<C> Fetch<C> {
25    /// Create a fetch result from the legacy `is_eof: bool` shape.
26    ///
27    /// `is_eof = true` maps to `FetchKind::NaturalEof`; `false` to
28    /// `FetchKind::Data`. Prefer the explicit constructors below.
29    pub fn new(data: C, is_eof: bool, epoch: u64) -> Self {
30        let kind = if is_eof {
31            FetchKind::NaturalEof
32        } else {
33            FetchKind::Data
34        };
35        Self { data, kind, epoch }
36    }
37
38    /// Explicit data chunk.
39    pub fn data(data: C, epoch: u64) -> Self {
40        Self {
41            data,
42            epoch,
43            kind: FetchKind::Data,
44        }
45    }
46
47    /// Get the epoch.
48    pub fn epoch(&self) -> u64 {
49        self.epoch
50    }
51
52    /// Explicit failure marker (distinct from natural EOF).
53    pub fn failure(data: C, epoch: u64) -> Self {
54        Self {
55            data,
56            epoch,
57            kind: FetchKind::Failure,
58        }
59    }
60
61    /// Consume and return the inner data.
62    pub fn into_inner(self) -> C {
63        self.data
64    }
65
66    /// True iff this is a natural-EOF marker. Does **not** cover
67    /// `Failure` — callers must use `is_failure()` or `is_terminal()`
68    /// for the broader "end of stream for any reason" check.
69    pub fn is_eof(&self) -> bool {
70        self.kind == FetchKind::NaturalEof
71    }
72
73    /// True iff this is any terminal marker (natural EOF or failure).
74    pub fn is_terminal(&self) -> bool {
75        !matches!(self.kind, FetchKind::Data)
76    }
77}
78
79/// Validator that checks epoch for seek invalidation.
80///
81/// Consumer increments epoch on seek; items with old epoch are discarded.
82#[derive(Debug, Clone, Default)]
83pub struct EpochValidator {
84    /// Current consumer epoch.
85    pub epoch: u64,
86}
87
88impl EpochValidator {
89    /// Check if a fetch result matches the current epoch.
90    pub fn is_valid<C>(&self, item: &Fetch<C>) -> bool {
91        item.epoch == self.epoch
92    }
93}
94
95#[cfg(test)]
96mod tests {
97    use kithara_test_utils::kithara;
98
99    use super::*;
100
101    #[kithara::test]
102    fn epoch_validator_keeps_matching_chunks() {
103        let mut validator = EpochValidator::default();
104        let item = Fetch::new(vec![1u8, 2, 3], false, 1);
105        validator.epoch = 1;
106        assert!(validator.is_valid(&item));
107    }
108
109    #[kithara::test]
110    fn epoch_validator_rejects_stale_chunks_after_seek() {
111        let mut validator = EpochValidator::default();
112        let stale = Fetch::new(vec![3u8], false, validator.epoch);
113        let first = Fetch::new(vec![1u8], false, validator.epoch);
114        validator.epoch = validator.epoch.wrapping_add(1);
115        let next = Fetch::new(vec![2u8], false, validator.epoch);
116
117        assert!(!validator.is_valid(&first));
118        assert!(!validator.is_valid(&stale));
119        assert!(validator.is_valid(&next));
120    }
121}