1#![forbid(unsafe_code)]
2
3use std::{
4 error::Error as StdError,
5 fmt,
6 future::Future,
7 io::{self, Error as IoError, ErrorKind, Read, Seek, SeekFrom},
8 num::NonZeroUsize,
9 ops::Range,
10 sync::Arc,
11};
12
13use kithara_platform::{MaybeSend, MaybeSync, thread::yield_now, time::Duration, tokio::task};
14use kithara_storage::WaitOutcome;
15use kithara_test_utils::kithara;
16
17#[derive(Debug)]
25#[non_exhaustive]
26pub enum StreamReadError {
27 Source(IoError),
29}
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum StreamReadOutcome {
41 Bytes {
44 count: NonZeroUsize,
45 byte_position: u64,
46 },
47 Pending(PendingReason),
50 Eof { byte_position: u64 },
53}
54
55#[derive(Debug, Clone, Copy)]
64pub struct StreamSeekPastEof {
65 pub current_pos: u64,
66 pub len: u64,
67 pub new_pos: u64,
68}
69
70impl fmt::Display for StreamSeekPastEof {
71 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72 write!(
73 f,
74 "seek past EOF: new_pos={} len={} current_pos={}",
75 self.new_pos, self.len, self.current_pos
76 )
77 }
78}
79
80impl StdError for StreamSeekPastEof {}
81
82impl fmt::Display for StreamReadError {
83 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84 match self {
85 Self::Source(e) => write!(f, "source error: {e}"),
86 }
87 }
88}
89
90impl StdError for StreamReadError {
91 fn source(&self) -> Option<&(dyn StdError + 'static)> {
93 match self {
94 Self::Source(e) => Some(e),
95 }
96 }
97}
98
99#[derive(Debug, Clone, Copy)]
113pub struct StreamPending {
114 pub len: Option<u64>,
115 pub reason: PendingReason,
116 pub phase: SourcePhase,
117 pub flushing: bool,
118 pub variant_fence: bool,
119 pub epoch: u64,
120 pub pos: u64,
121 pub want: usize,
122}
123
124impl fmt::Display for StreamPending {
125 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126 write!(
127 f,
128 "{}: pos={} want={} len={:?} phase={:?} epoch={} flushing={} variant_fence={}",
129 self.reason,
130 self.pos,
131 self.want,
132 self.len,
133 self.phase,
134 self.epoch,
135 self.flushing,
136 self.variant_fence,
137 )
138 }
139}
140
141impl StdError for StreamPending {}
142
143#[derive(Debug, Clone, Copy)]
149pub struct VariantChangeError;
150
151impl fmt::Display for VariantChangeError {
152 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
153 f.write_str("variant change: decoder recreation required")
154 }
155}
156
157impl StdError for VariantChangeError {}
158
159use crate::{
160 MediaInfo, SourcePhase, SourceSeekAnchor, Timeline,
161 error::{SourceError, StreamError},
162 source::{NotReadyCause, PendingReason, ReadOutcome, Source},
163};
164
165pub trait StreamType: MaybeSend + 'static {
172 type Config: Default + MaybeSend;
174
175 type Events: Clone + MaybeSend + MaybeSync + 'static;
180
181 type Source: Source;
183
184 fn create(config: Self::Config) -> impl Future<Output = Result<Self::Source, SourceError>>;
188
189 fn event_bus(config: &Self::Config) -> Option<Self::Events> {
194 let _ = config;
195 None
196 }
197}
198
199pub struct Stream<T: StreamType> {
205 source: T::Source,
206}
207
208impl<T: StreamType> Stream<T> {
209 pub async fn new(config: T::Config) -> Result<Self, SourceError> {
215 let source = T::create(config).await?;
216 task::yield_now().await;
217 Ok(Self { source })
218 }
219
220 pub fn is_empty(&self) -> Option<bool> {
221 self.len().map(|len| len == 0)
222 }
223
224 pub fn position(&self) -> u64 {
226 self.source.position()
227 }
228
229 pub fn seek_time_anchor(
237 &mut self,
238 position: Duration,
239 ) -> Result<Option<SourceSeekAnchor>, io::Error> {
240 self.source
241 .seek_time_anchor(position)
242 .map_err(|e| IoError::other(e.to_string()))
243 }
244
245 pub fn source(&self) -> &T::Source {
247 &self.source
248 }
249
250 pub fn timeline(&self) -> Timeline {
252 self.source.timeline()
253 }
254
255 delegate::delegate! {
256 to self.source {
257 pub fn phase(&self) -> SourcePhase;
259 pub fn phase_at(&self, range: Range<u64>) -> SourcePhase;
261 pub fn media_info(&self) -> Option<MediaInfo>;
263 pub fn abr_handle(&self) -> Option<kithara_abr::AbrHandle>;
265 pub fn current_variant(&self) -> Option<kithara_events::VariantInfo>;
267 pub fn len(&self) -> Option<u64>;
269 pub fn current_segment_range(&self) -> Option<Range<u64>>;
272 pub fn format_change_segment_range(&self) -> crate::error::StreamResult<Range<u64>>;
281 pub fn clear_variant_fence(&mut self);
283 pub fn has_variant_change_pending(&self) -> bool;
286 pub fn set_seek_epoch(&mut self, seek_epoch: u64);
288 pub fn notify_waiting(&self);
290 pub fn make_notify_fn(&self) -> Option<Box<dyn Fn() + Send + Sync>>;
292 pub fn commit_seek_landing(&mut self, anchor: Option<SourceSeekAnchor>);
294 pub fn take_reader_hooks(&mut self) -> Option<crate::SharedHooks>;
296 pub fn as_segment_layout(&self) -> Option<Arc<dyn crate::SegmentLayout>>;
298 pub fn set_position(&self, pos: u64);
301 }
302 }
303}
304
305impl<T: StreamType> Stream<T> {
306 const MAX_WAIT_SPINS: u32 = 50;
312
313 const SEEK_WAIT_TIMEOUT: Duration = Duration::from_millis(500);
320
321 const WAIT_RANGE_TIMEOUT: Duration = Duration::from_millis(10);
326
327 #[cfg_attr(feature = "perf", hotpath::measure)]
334 #[kithara::hang_watchdog]
335 pub fn try_read(&mut self, buf: &mut [u8]) -> Result<StreamReadOutcome, StreamReadError> {
336 if buf.is_empty() {
337 return Ok(StreamReadOutcome::Eof {
338 byte_position: self.source.position(),
339 });
340 }
341
342 let mut wait_spins = 0u32;
343
344 loop {
345 let timeline = self.source.timeline();
346 let read_epoch = timeline.seek_epoch();
347 let pos = self.source.position();
348 let range = pos..pos.saturating_add(buf.len() as u64);
349
350 if self.source.has_variant_change_pending() {
352 return Ok(StreamReadOutcome::Pending(PendingReason::VariantChange));
353 }
354
355 let wait_result = self
356 .source
357 .wait_range(range, Some(Self::WAIT_RANGE_TIMEOUT));
358 let wait_outcome = match wait_result {
359 Ok(outcome) => outcome,
360 Err(StreamError::Source(SourceError::WaitBudgetExceeded)) => {
361 if timeline.is_flushing() || timeline.seek_epoch() != read_epoch {
362 return Ok(StreamReadOutcome::Pending(PendingReason::SeekPending));
363 }
364 wait_spins += 1;
365 if wait_spins >= Self::MAX_WAIT_SPINS {
366 return Ok(StreamReadOutcome::Pending(PendingReason::NotReady(
367 NotReadyCause::WaitBudgetExhausted,
368 )));
369 }
370 hang_tick!();
371 yield_now();
372 continue;
373 }
374 Err(e) => {
375 return Err(StreamReadError::Source(IoError::other(e.to_string())));
376 }
377 };
378 match wait_outcome {
379 WaitOutcome::Ready => {}
380 WaitOutcome::Eof => {
381 return Ok(StreamReadOutcome::Eof { byte_position: pos });
382 }
383 WaitOutcome::Interrupted => {
384 if !timeline.is_flushing() {
385 wait_spins += 1;
386 if wait_spins >= Self::MAX_WAIT_SPINS {
387 return Ok(StreamReadOutcome::Pending(PendingReason::NotReady(
388 NotReadyCause::WaitInterrupted,
389 )));
390 }
391 hang_tick!();
392 yield_now();
393 continue;
394 }
395 return Ok(StreamReadOutcome::Pending(PendingReason::SeekPending));
396 }
397 }
398
399 wait_spins = 0;
400
401 if timeline.seek_epoch() != read_epoch {
402 return Ok(StreamReadOutcome::Pending(PendingReason::SeekPending));
403 }
404
405 match self
406 .source
407 .read_at(pos, buf)
408 .map_err(|e| StreamReadError::Source(IoError::other(e.to_string())))?
409 {
410 ReadOutcome::Bytes(count) => {
411 if timeline.seek_epoch() != read_epoch {
412 return Ok(StreamReadOutcome::Pending(PendingReason::SeekPending));
413 }
414 hang_reset!();
415 timeline.set_segment_position(pos);
416 self.source.advance(count.get() as u64);
417 let new_pos = self.source.position();
418 return Ok(StreamReadOutcome::Bytes {
419 count,
420 byte_position: new_pos,
421 });
422 }
423 ReadOutcome::Eof => {
424 return Ok(StreamReadOutcome::Eof { byte_position: pos });
425 }
426 ReadOutcome::Pending(PendingReason::Retry) => {
427 hang_tick!();
428 yield_now();
429 continue;
430 }
431 ReadOutcome::Pending(reason) => {
432 return Ok(StreamReadOutcome::Pending(reason));
433 }
434 }
435 }
436 }
437}
438
439impl<T: StreamType> Read for Stream<T> {
440 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
441 match self.try_read(buf) {
442 Ok(StreamReadOutcome::Bytes { count, .. }) => Ok(count.get()),
443 Ok(StreamReadOutcome::Eof { .. }) => Ok(0),
444 Ok(StreamReadOutcome::Pending(reason @ PendingReason::SeekPending)) => {
445 Err(IoError::new(ErrorKind::Interrupted, reason))
446 }
447 Ok(StreamReadOutcome::Pending(
448 reason @ (PendingReason::NotReady(_) | PendingReason::Retry),
449 )) => Err(IoError::new(
450 ErrorKind::Interrupted,
451 self.snapshot_pending(reason, buf.len()),
452 )),
453 Ok(StreamReadOutcome::Pending(PendingReason::VariantChange)) => {
454 Err(IoError::other(VariantChangeError))
455 }
456 Err(StreamReadError::Source(e)) => Err(e),
457 }
458 }
459}
460
461impl<T: StreamType> Stream<T> {
462 fn snapshot_pending(&self, reason: PendingReason, want: usize) -> StreamPending {
470 let pos = self.source.position();
471 let len = self.source.len();
472 let phase = self.source.phase_at(pos..pos.saturating_add(want as u64));
473 let timeline = self.source.timeline();
474 StreamPending {
475 reason,
476 pos,
477 want,
478 len,
479 phase,
480 epoch: timeline.seek_epoch(),
481 flushing: timeline.is_flushing(),
482 variant_fence: self.source.has_variant_change_pending(),
483 }
484 }
485}
486
487impl<T: StreamType> Seek for Stream<T> {
488 #[cfg_attr(feature = "perf", hotpath::measure)]
489 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
490 let current = self.source.position();
491
492 let new_pos: i128 = match pos {
493 SeekFrom::Start(p) => i128::from(p),
494 SeekFrom::Current(delta) => i128::from(current).saturating_add(i128::from(delta)),
495 SeekFrom::End(delta) => {
496 if self.source.len().is_none() {
497 let _ = self.source.wait_range(0..1, None);
498 }
499 let Some(len) = self.source.len() else {
500 return Err(IoError::new(
501 ErrorKind::Unsupported,
502 "seek from end requires known length",
503 ));
504 };
505 i128::from(len).saturating_add(i128::from(delta))
506 }
507 };
508
509 if new_pos < 0 {
510 return Err(IoError::new(
511 ErrorKind::InvalidInput,
512 "negative seek position",
513 ));
514 }
515
516 let new_pos = u64::try_from(new_pos).unwrap_or(u64::MAX);
517
518 let wait_range = match self.source.format_change_segment_range() {
519 Ok(range) if range.start == new_pos => range,
520 _ => new_pos..new_pos.saturating_add(1),
521 };
522 let _ = self
523 .source
524 .wait_range(wait_range, Some(Self::SEEK_WAIT_TIMEOUT));
525
526 if let Some(len) = self.source.len()
527 && new_pos > len
528 {
529 return Err(IoError::new(
530 ErrorKind::InvalidInput,
531 StreamSeekPastEof {
532 new_pos,
533 len,
534 current_pos: current,
535 },
536 ));
537 }
538
539 self.source.set_position(new_pos);
540 Ok(new_pos)
541 }
542}
543
544#[cfg(test)]
545mod tests {
546 use std::{
547 collections::VecDeque,
548 sync::{
549 Arc,
550 atomic::{AtomicU64, Ordering},
551 },
552 };
553
554 use kithara_storage::WaitOutcome;
555 use kithara_test_utils::kithara;
556
557 use super::*;
558 use crate::{ReadOutcome, Source, SourcePhase};
559
560 #[derive(Clone, Copy)]
565 enum ScriptRead {
566 Data(usize),
567 Eof,
568 }
569
570 fn bytes(count: usize) -> ReadOutcome {
571 let nz = NonZeroUsize::new(count)
572 .expect("BUG: ScriptSource::bytes invariant — count must be > 0");
573 ReadOutcome::Bytes(nz)
574 }
575
576 struct ScriptSource {
577 position: Arc<AtomicU64>,
578 anchor: Option<SourceSeekAnchor>,
579 timeline: Timeline,
580 data: Vec<u8>,
581 reads: VecDeque<ScriptRead>,
582 waits: VecDeque<WaitOutcome>,
583 }
584
585 impl ScriptSource {
586 fn new(
587 timeline: Timeline,
588 waits: impl IntoIterator<Item = WaitOutcome>,
589 reads: impl IntoIterator<Item = ScriptRead>,
590 data: Vec<u8>,
591 ) -> Self {
592 Self {
593 timeline,
594 data,
595 position: Arc::new(AtomicU64::new(0)),
596 anchor: None,
597 reads: reads.into_iter().collect(),
598 waits: waits.into_iter().collect(),
599 }
600 }
601 }
602
603 impl Source for ScriptSource {
604 fn advance(&self, n: u64) {
605 self.position.fetch_add(n, Ordering::AcqRel);
606 }
607
608 fn len(&self) -> Option<u64> {
609 Some(self.data.len() as u64)
610 }
611
612 fn phase_at(&self, _range: Range<u64>) -> SourcePhase {
613 SourcePhase::Waiting
614 }
615
616 fn position(&self) -> u64 {
617 self.position.load(Ordering::Acquire)
618 }
619
620 fn read_at(&mut self, offset: u64, buf: &mut [u8]) -> crate::StreamResult<ReadOutcome> {
621 let step = self.reads.pop_front().unwrap_or(ScriptRead::Eof);
622 match step {
623 ScriptRead::Eof => Ok(ReadOutcome::Eof),
624 ScriptRead::Data(n) => {
625 let Ok(start) = usize::try_from(offset) else {
626 return Ok(ReadOutcome::Eof);
627 };
628 let end = (start + n).min(self.data.len());
629 let bytes_count = end.saturating_sub(start).min(buf.len());
630 if bytes_count == 0 {
631 return Ok(ReadOutcome::Eof);
632 }
633 buf[..bytes_count].copy_from_slice(&self.data[start..start + bytes_count]);
634 Ok(bytes(bytes_count))
635 }
636 }
637 }
638
639 fn seek_time_anchor(
640 &mut self,
641 _position: Duration,
642 ) -> crate::StreamResult<Option<SourceSeekAnchor>> {
643 Ok(self.anchor)
644 }
645
646 fn set_position(&self, pos: u64) {
647 self.position.store(pos, Ordering::Release);
648 }
649
650 fn timeline(&self) -> Timeline {
651 self.timeline.clone()
652 }
653
654 fn wait_range(
655 &mut self,
656 _range: Range<u64>,
657 _timeout: Option<Duration>,
658 ) -> crate::StreamResult<WaitOutcome> {
659 Ok(self.waits.pop_front().unwrap_or(WaitOutcome::Ready))
660 }
661 }
662
663 struct DummyType;
664
665 impl StreamType for DummyType {
666 type Config = ();
667 type Events = ();
668 type Source = ScriptSource;
669
670 async fn create(_config: Self::Config) -> Result<Self::Source, SourceError> {
671 Err(SourceError::other(IoError::other("not used in unit tests")))
672 }
673 }
674
675 struct SeekDuringWaitType;
676
677 impl StreamType for SeekDuringWaitType {
678 type Config = ();
679 type Events = ();
680 type Source = SeekDuringWaitSource;
681
682 async fn create(_config: Self::Config) -> Result<Self::Source, SourceError> {
683 Err(SourceError::other(IoError::other("not used in unit tests")))
684 }
685 }
686
687 struct SeekDuringWaitSource {
688 position: Arc<AtomicU64>,
689 timeline: Timeline,
690 read_calls: usize,
691 }
692
693 impl Source for SeekDuringWaitSource {
694 fn advance(&self, n: u64) {
695 self.position.fetch_add(n, Ordering::AcqRel);
696 }
697
698 fn len(&self) -> Option<u64> {
699 Some(4)
700 }
701
702 fn phase_at(&self, _range: Range<u64>) -> SourcePhase {
703 SourcePhase::Ready
704 }
705
706 fn position(&self) -> u64 {
707 self.position.load(Ordering::Acquire)
708 }
709
710 fn read_at(&mut self, _offset: u64, _buf: &mut [u8]) -> crate::StreamResult<ReadOutcome> {
711 self.read_calls += 1;
712 Ok(bytes(4))
713 }
714
715 fn set_position(&self, pos: u64) {
716 self.position.store(pos, Ordering::Release);
717 }
718
719 fn timeline(&self) -> Timeline {
720 self.timeline.clone()
721 }
722
723 fn wait_range(
724 &mut self,
725 _range: Range<u64>,
726 _timeout: Option<Duration>,
727 ) -> crate::StreamResult<WaitOutcome> {
728 let _ = self.timeline.initiate_seek(Duration::from_millis(10));
729 Ok(WaitOutcome::Ready)
730 }
731 }
732
733 #[kithara::test]
734 fn read_retries_interrupted_when_not_flushing() {
735 let timeline = Timeline::new();
736 let source = ScriptSource::new(
737 timeline.clone(),
738 [WaitOutcome::Interrupted, WaitOutcome::Ready],
739 [ScriptRead::Data(4)],
740 b"ABCD".to_vec(),
741 );
742 let mut stream = Stream::<DummyType> { source };
743 let mut buf = [0u8; 4];
744
745 let n = stream
746 .read(&mut buf)
747 .expect("BUG: read must succeed after the explicit retry in this test scenario");
748 assert_eq!(n, 4);
749 assert_eq!(&buf, b"ABCD");
750 }
751
752 #[kithara::test]
753 fn try_read_returns_seek_pending_when_flushing() {
754 let timeline = Timeline::new();
755 let _ = timeline.initiate_seek(Duration::from_millis(10));
756 let source = ScriptSource::new(timeline.clone(), [WaitOutcome::Interrupted], [], vec![]);
757 let mut stream = Stream::<DummyType> { source };
758 let mut buf = [0u8; 4];
759
760 let outcome = stream
761 .try_read(&mut buf)
762 .expect("BUG: seek-pending is a status return; not a hard error in this test");
763 assert!(matches!(
764 outcome,
765 StreamReadOutcome::Pending(PendingReason::SeekPending)
766 ));
767 }
768
769 #[kithara::test]
770 fn try_read_returns_seek_pending_when_epoch_changes_after_wait() {
771 let timeline = Timeline::new();
772 let source = SeekDuringWaitSource {
773 timeline: timeline.clone(),
774 position: Arc::new(AtomicU64::new(0)),
775 read_calls: 0,
776 };
777 let mut stream = Stream::<SeekDuringWaitType> { source };
778 let mut buf = [0u8; 4];
779
780 let outcome = stream
781 .try_read(&mut buf)
782 .expect("BUG: seek-pending is a status return; not a hard error in this test");
783
784 assert!(matches!(
785 outcome,
786 StreamReadOutcome::Pending(PendingReason::SeekPending)
787 ));
788 assert_eq!(stream.source.read_calls, 0);
789 assert_eq!(stream.position(), 0);
790 }
791
792 #[kithara::test]
793 fn seek_updates_position() {
794 let timeline = Timeline::new();
795 let source = ScriptSource::new(timeline.clone(), [], [], b"ABCDE".to_vec());
796 let mut stream = Stream::<DummyType> { source };
797
798 let pos = stream
799 .seek(SeekFrom::Start(3))
800 .expect("BUG: seek to a position within the test stream must succeed");
801
802 assert_eq!(pos, 3);
803 assert_eq!(stream.position(), 3);
804 }
805
806 #[kithara::test]
807 fn seek_time_anchor_does_not_move_position() {
808 let timeline = Timeline::new();
809 let mut source = ScriptSource::new(timeline.clone(), [], [], b"ABCDE".to_vec());
810 source.set_position(11);
811 source.anchor = Some(SourceSeekAnchor {
812 byte_offset: 3,
813 segment_start: Duration::from_secs(8),
814 segment_end: Some(Duration::from_secs(12)),
815 segment_index: Some(2),
816 variant_index: Some(1),
817 });
818 let mut stream = Stream::<DummyType> { source };
819
820 let anchor = stream
821 .seek_time_anchor(Duration::from_millis(8_500))
822 .expect("BUG: anchor resolution must succeed for the constructed test stream")
823 .expect("BUG: stream must return the resolved anchor in this test");
824
825 assert_eq!(anchor.byte_offset, 3);
826 assert_eq!(
827 stream.position(),
828 11,
829 "anchor resolution must not eagerly commit stream position"
830 );
831 }
832}