1use std::{path::PathBuf, sync::Arc};
2
3use kithara_assets::{
4 AssetResource, AssetStoreBuilder, AssetsError, EvictConfig, ResourceKey, asset_root_for_url,
5};
6use kithara_events::EventBus;
7use kithara_platform::{time::Duration, tokio};
8use kithara_storage::{ResourceExt, ResourceStatus, StorageError};
9use kithara_stream::{
10 AudioCodec, SourceError as StreamSourceError, StreamType, Timeline,
11 dl::{Downloader, DownloaderConfig},
12};
13use kithara_test_utils::kithara;
14use tokio_util::sync::CancellationToken;
15
16use crate::{
17 config::{FileConfig, FileSrc},
18 coord::FileCoord,
19 error::SourceError,
20 session::{
21 FileAssetCtx, FileInner, FilePeer, FilePhase, FileSource, FileSourceCtx, FileStreamState,
22 },
23};
24
25pub struct File;
27
28const TMP_CLAIMED_POLL_INTERVAL: Duration = Duration::from_millis(10);
34
35impl StreamType for File {
36 type Config = FileConfig;
37 type Events = EventBus;
38 type Source = FileSource;
39
40 async fn create(config: Self::Config) -> Result<Self::Source, StreamSourceError> {
41 let cancel = config.cancel.clone().unwrap_or_default();
42 let src = config.src.clone();
43
44 match src {
45 FileSrc::Local(path) => {
46 Self::create_local(path, config, &cancel).map_err(StreamSourceError::from)
47 }
48 FileSrc::Remote(url) => Self::create_remote_wait_for_claim(url, config, cancel).await,
49 }
50 }
51
52 fn event_bus(config: &Self::Config) -> Option<Self::Events> {
53 config.bus.clone()
54 }
55}
56
57impl File {
58 fn create_local(
60 path: PathBuf,
61 config: FileConfig,
62 cancel: &CancellationToken,
63 ) -> Result<FileSource, SourceError> {
64 if !path.exists() {
65 return Err(SourceError::InvalidPath(format!(
66 "file not found: {}",
67 path.display()
68 )));
69 }
70
71 let store = Arc::new(
72 AssetStoreBuilder::new()
73 .asset_root(None)
74 .cancel(cancel.clone())
75 .build(),
76 );
77
78 let key = ResourceKey::absolute(path);
79 let res = store.open_resource(&key).map_err(SourceError::Assets)?;
80 let len = res.len();
81
82 let bus = config
83 .bus
84 .unwrap_or_else(|| EventBus::new(config.event_channel_capacity));
85
86 let timeline = Timeline::new();
87 let coord = Arc::new(FileCoord::new(timeline));
88 coord.set_total_bytes(len);
89 let total = len.unwrap_or(0);
90 coord.set_download_pos(total);
91
92 let cached_codec = sniff_codec(&res);
93
94 Ok(FileSource::local(
95 res,
96 coord,
97 bus,
98 store,
99 key,
100 cancel.child_token(),
101 cached_codec,
102 ))
103 }
104
105 fn create_remote(
112 url: url::Url,
113 config: FileConfig,
114 cancel: CancellationToken,
115 ) -> Result<FileSource, SourceError> {
116 let from_config = config.name.as_deref();
117 let from_query = url.query();
118 let name_or_query = from_config.or(from_query).filter(|s| !s.is_empty());
119 let asset_root = asset_root_for_url(&url, name_or_query);
120
121 let downloader = config.downloader.clone().unwrap_or_else(|| {
122 let cancel_for_dl = cancel.child_token();
123 let client = kithara_net::HttpClient::new(
124 kithara_net::NetOptions::default(),
125 cancel_for_dl.child_token(),
126 );
127 Downloader::new(
128 DownloaderConfig::for_client(client)
129 .cancel(cancel_for_dl)
130 .build(),
131 )
132 });
133
134 let backend_builder = AssetStoreBuilder::new()
135 .root_dir(&config.store.cache_dir)
136 .asset_root(Some(asset_root.as_str()))
137 .evict_config(EvictConfig::from(&config.store))
138 .ephemeral(config.store.is_ephemeral)
139 .cancel(cancel.clone());
140 let backend = Arc::new(backend_builder.build());
141
142 let state = FileStreamState::create(
143 &backend,
144 &url,
145 config.bus.clone(),
146 config.event_channel_capacity,
147 )?;
148
149 let timeline = Timeline::new();
150 let coord = Arc::new(FileCoord::new(timeline));
151 coord.set_total_bytes(state.res.len());
152
153 if matches!(state.res.status(), ResourceStatus::Committed { .. }) {
154 tracing::debug!("file already cached, skipping download");
155 let total = coord.total_bytes().unwrap_or(0);
156 coord.set_download_pos(total);
157
158 let cached_codec = sniff_codec(&state.res);
159
160 return Ok(FileSource::local(
161 state.res.clone(),
162 coord,
163 state.bus.clone(),
164 Arc::clone(&state.backend),
165 state.key.clone(),
166 cancel.child_token(),
167 cached_codec,
168 ));
169 }
170
171 let inner = Arc::new(FileInner::new(
172 FileSourceCtx {
173 cancel,
174 coord: Arc::clone(&coord),
175 bus: state.bus.clone(),
176 },
177 FileAssetCtx {
178 url,
179 headers: config.headers,
180 backend: Arc::clone(&state.backend),
181 res: state.res.clone(),
182 key: state.key.clone(),
183 },
184 FilePhase::Init,
185 ));
186
187 let peer_handle = downloader
188 .register(Arc::new(FilePeer::new(Arc::clone(&inner))))
189 .with_bus(state.bus.clone());
190
191 let mut source = FileSource::with_inner(inner, coord);
192 source.set_peer_handle(peer_handle);
193 Ok(source)
194 }
195
196 #[kithara::hang_watchdog]
207 async fn create_remote_wait_for_claim(
208 url: url::Url,
209 config: FileConfig,
210 cancel: CancellationToken,
211 ) -> Result<FileSource, StreamSourceError> {
212 loop {
213 match Self::create_remote(url.clone(), config.clone(), cancel.clone()) {
214 Ok(src) => {
215 hang_reset!();
216 return Ok(src);
217 }
218 Err(SourceError::Assets(AssetsError::Storage(StorageError::TmpClaimed(_)))) => {
219 hang_tick!();
220 tokio::time::sleep(TMP_CLAIMED_POLL_INTERVAL).await;
221 }
222 Err(e) => return Err(StreamSourceError::from(e)),
223 }
224 }
225 }
226}
227
228fn sniff_codec(res: &AssetResource) -> Option<AudioCodec> {
234 let mut buf = [0u8; 16];
235 let read = res.read_at(0, &mut buf).ok()?;
236 AudioCodec::try_from(&buf[..read]).ok()
237}