use std::{num::NonZeroUsize, sync::Arc};
use kithara_assets::{AssetResource, AssetStoreBuilder, ResourceKey};
use kithara_events::EventBus;
use kithara_platform::time::Duration;
use kithara_storage::{ResourceExt, WaitOutcome};
use kithara_stream::{ReadOutcome, Source, SourcePhase, Timeline};
use kithara_test_utils::kithara;
use tokio_util::sync::CancellationToken;
use super::source::FileSource;
use crate::coord::FileCoord;
fn nz_bytes(n: usize) -> ReadOutcome {
ReadOutcome::Bytes(NonZeroUsize::new(n).expect("test: byte count must be > 0"))
}
fn make_coord(timeline: Timeline) -> Arc<FileCoord> {
Arc::new(FileCoord::new(timeline))
}
fn make_source(res: AssetResource, coord: Arc<FileCoord>, bus: EventBus) -> FileSource {
let backend = Arc::new(
AssetStoreBuilder::new()
.asset_root(None)
.cancel(CancellationToken::new())
.build(),
);
let key = ResourceKey::new("test-source");
FileSource::local(
res,
coord,
bus,
backend,
key,
CancellationToken::new(),
None,
)
}
#[kithara::test]
fn test_file_coord_initial_state() {
let coord = FileCoord::new(Timeline::new());
assert_eq!(coord.read_pos(), 0);
}
#[kithara::test]
#[case::read(100, true)]
#[case::download(500, false)]
fn test_file_coord_set_and_get_positions(#[case] value: u64, #[case] read_pos: bool) {
let coord = FileCoord::new(Timeline::new());
if read_pos {
coord.set_read_pos(value);
assert_eq!(coord.read_pos(), value);
} else {
coord.set_download_pos(value);
assert_eq!(
coord.read_pos(),
0,
"download position is orthogonal to read position"
);
}
}
#[kithara::test]
fn file_coord_total_bytes_roundtrip() {
let coord = FileCoord::new(Timeline::new());
assert_eq!(coord.total_bytes(), None);
coord.set_total_bytes(Some(123));
assert_eq!(coord.total_bytes(), Some(123));
coord.set_total_bytes(None);
assert_eq!(coord.total_bytes(), None);
}
fn create_committed_resource(data: &[u8]) -> AssetResource {
let store = AssetStoreBuilder::new()
.ephemeral(true)
.asset_root(Some("test"))
.cancel(CancellationToken::new())
.build();
let key = ResourceKey::new("test.dat");
let res = store.acquire_resource(&key).unwrap();
res.write_at(0, data).unwrap();
res.commit(Some(data.len() as u64)).unwrap();
res
}
#[kithara::test]
fn test_file_source_read_at() {
let data = b"hello world from kithara";
let res = create_committed_resource(data);
let coord = make_coord(Timeline::new());
let bus = EventBus::new(16);
coord.set_total_bytes(Some(data.len() as u64));
let mut source = make_source(res, Arc::clone(&coord), bus);
let mut buf = [0u8; 11];
assert_eq!(
Source::read_at(&mut source, 0, &mut buf).unwrap(),
nz_bytes(11)
);
assert_eq!(&buf[..11], b"hello world");
assert_eq!(
coord.read_pos(),
0,
"read_at must not advance the reader cursor outside Stream::read"
);
let mut buf2 = [0u8; 7];
assert_eq!(
Source::read_at(&mut source, 6, &mut buf2).unwrap(),
nz_bytes(7)
);
assert_eq!(&buf2[..7], b"world f");
assert_eq!(coord.read_pos(), 0);
}
#[kithara::test]
fn test_file_source_len() {
let res = create_committed_resource(b"abc");
let coord = make_coord(Timeline::new());
let bus = EventBus::new(16);
coord.set_total_bytes(Some(12345));
let source = make_source(res, coord, bus);
assert_eq!(Source::len(&source), Some(12345));
}
#[kithara::test]
#[case::ready_when_range_present(b"hello world", 11, 0..5, SourcePhase::Ready)]
#[case::eof_past_known_length(b"abc", 3, 100..110, SourcePhase::Eof)]
fn file_source_phase_at_range(
#[case] data: &[u8],
#[case] total_bytes: u64,
#[case] range: std::ops::Range<u64>,
#[case] expected: SourcePhase,
) {
let res = create_committed_resource(data);
let coord = make_coord(Timeline::new());
let bus = EventBus::new(16);
coord.set_total_bytes(Some(total_bytes));
let source = make_source(res, coord, bus);
assert_eq!(source.phase_at(range), expected);
}
#[kithara::test]
#[case::seeking_when_data_not_ready(100, 50..60, SourcePhase::Seeking)]
#[case::ready_beats_seeking_when_data_present(11, 0..5, SourcePhase::Ready)]
fn file_source_phase_during_seek(
#[case] total_bytes: u64,
#[case] range: std::ops::Range<u64>,
#[case] expected: SourcePhase,
) {
let data = b"hello world";
let res = create_committed_resource(data);
let timeline = Timeline::new();
let coord = make_coord(timeline.clone());
let bus = EventBus::new(16);
coord.set_total_bytes(Some(total_bytes));
let source = make_source(res, coord, bus);
let _ = timeline.initiate_seek(Duration::from_secs(0));
assert_eq!(source.phase_at(range), expected);
}
#[kithara::test]
#[case::ready_when_current_byte_is_available(0, SourcePhase::Ready)]
#[case::waiting_when_current_byte_is_missing(32, SourcePhase::Waiting)]
#[case::eof_at_end(64, SourcePhase::Eof)]
fn file_source_phase_parameterless(#[case] position: u64, #[case] expected: SourcePhase) {
let data = [0xABu8; 64];
let res = create_committed_resource(&data[..16]);
let coord = make_coord(Timeline::new());
let bus = EventBus::new(16);
coord.set_total_bytes(Some(data.len() as u64));
if position > 0 {
coord.set_position(position);
}
let source = make_source(res, coord, bus);
assert_eq!(Source::phase(&source), expected);
}
#[kithara::test]
fn file_source_wait_range_returns_interrupted_while_flushing() {
let data = b"hello world from kithara";
let res = create_committed_resource(data);
let timeline = Timeline::new();
let coord = make_coord(timeline.clone());
let bus = EventBus::new(16);
coord.set_total_bytes(Some(100));
let mut source = make_source(res, coord, bus);
let _ = timeline.initiate_seek(Duration::from_secs(0));
let result = Source::wait_range(&mut source, 50..60, Some(Duration::from_secs(1)));
assert_eq!(result.unwrap(), WaitOutcome::Interrupted);
}
#[kithara::test]
fn file_source_read_at_does_not_advance_timeline_position() {
let res = create_committed_resource(b"abcdef");
let timeline = Timeline::new();
let coord = make_coord(timeline.clone());
let bus = EventBus::new(16);
coord.set_total_bytes(Some(6));
let mut source = make_source(res, Arc::clone(&coord), bus);
let mut buf = [0u8; 2];
assert_eq!(
Source::read_at(&mut source, 0, &mut buf).unwrap(),
nz_bytes(2)
);
assert_eq!(coord.read_pos(), 0);
assert_eq!(Source::position(&source), 0);
coord.set_read_pos(5);
assert_eq!(coord.read_pos(), 5);
assert_eq!(Source::position(&source), 0);
}