use std::sync::{
Arc, OnceLock,
atomic::{AtomicU8, Ordering},
};
use kithara_assets::{AssetResource, AssetStore, ResourceKey};
use kithara_events::EventBus;
use kithara_net::Headers;
use kithara_storage::ResourceExt;
use kithara_stream::MediaInfo;
use tokio_util::sync::CancellationToken;
use url::Url;
use super::segments::FileSegmentIndex;
use crate::{coord::FileCoord, error::SourceError};
#[derive(Debug, Clone)]
pub(crate) struct FileStreamState {
pub(crate) backend: Arc<AssetStore>,
pub(crate) res: AssetResource,
pub(crate) bus: EventBus,
pub(crate) key: ResourceKey,
}
impl FileStreamState {
pub(crate) fn create(
assets: &Arc<AssetStore>,
url: &Url,
bus: Option<EventBus>,
event_channel_capacity: usize,
) -> Result<Self, SourceError> {
let key = ResourceKey::from(url);
let res = assets.acquire_resource(&key).map_err(SourceError::Assets)?;
let bus = bus.unwrap_or_else(|| EventBus::new(event_channel_capacity));
Ok(Self {
bus,
key,
res,
backend: Arc::clone(assets),
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub(crate) enum FilePhase {
Init = 0,
Downloading = 1,
Complete = 2,
}
pub(crate) struct FileSourceCtx {
pub(crate) coord: Arc<FileCoord>,
pub(crate) cancel: CancellationToken,
pub(crate) bus: EventBus,
}
pub(crate) struct FileAssetCtx {
pub(crate) backend: Arc<AssetStore>,
pub(crate) res: AssetResource,
pub(crate) headers: Option<Headers>,
pub(crate) key: ResourceKey,
pub(crate) url: Url,
}
pub(crate) struct FileInner {
pub(crate) asset: FileAssetCtx,
pub(crate) source: FileSourceCtx,
pub(crate) content_type_info: OnceLock<MediaInfo>,
pub(crate) segment_index: OnceLock<FileSegmentIndex>,
phase: AtomicU8,
}
impl FileInner {
pub(crate) fn new(
source: FileSourceCtx,
asset: FileAssetCtx,
initial_phase: FilePhase,
) -> Self {
let inner = Self {
source,
asset,
content_type_info: OnceLock::new(),
segment_index: OnceLock::new(),
phase: AtomicU8::new(initial_phase as u8),
};
if matches!(initial_phase, FilePhase::Complete) {
inner.try_build_segment_index();
}
inner
}
pub(crate) fn fail_and_evict(&self, reason: &str) {
self.set_phase(FilePhase::Complete);
self.asset.res.fail(reason.to_string());
self.asset.backend.remove_resource(&self.asset.key);
}
pub(crate) fn set_phase(&self, phase: FilePhase) {
self.phase.store(phase as u8, Ordering::Release);
if matches!(phase, FilePhase::Complete) {
self.try_build_segment_index();
}
}
fn try_build_segment_index(&self) {
if self.segment_index.get().is_some() {
return;
}
let Some(total) = self.asset.res.len() else {
return;
};
if total == 0 || !self.asset.res.contains_range(0..total) {
return;
}
let Ok(total_usize) = usize::try_from(total) else {
return;
};
let mut buf: Box<[u8]> = std::iter::repeat_n(0u8, total_usize).collect();
if self.asset.res.read_at(0, &mut buf).is_err() {
return;
}
if let Some(index) = FileSegmentIndex::try_build(&buf) {
let _ = self.segment_index.set(index);
}
}
}