Skip to main content

kithara_file/
stream.rs

1use std::{path::PathBuf, sync::Arc};
2
3use kithara_assets::{
4    AssetResource, AssetStoreBuilder, AssetsError, EvictConfig, ResourceKey, asset_root_for_url,
5};
6use kithara_events::EventBus;
7use kithara_platform::{time::Duration, tokio};
8use kithara_storage::{ResourceExt, ResourceStatus, StorageError};
9use kithara_stream::{
10    AudioCodec, SourceError as StreamSourceError, StreamType, Timeline,
11    dl::{Downloader, DownloaderConfig},
12};
13use kithara_test_utils::kithara;
14use tokio_util::sync::CancellationToken;
15
16use crate::{
17    config::{FileConfig, FileSrc},
18    coord::FileCoord,
19    error::SourceError,
20    session::{
21        FileAssetCtx, FileInner, FilePeer, FilePhase, FileSource, FileSourceCtx, FileStreamState,
22    },
23};
24
25/// Marker type for file streaming.
26pub struct File;
27
28/// Bounded poll interval while a sibling `AssetStore` instance holds
29/// the atomic-chunked tmp for the same canonical path. Short enough
30/// that the observed ~67 ms race window in
31/// `local_queue_playlist_behavior` resolves in a handful of ticks but
32/// long enough not to busy-spin a tokio worker.
33const TMP_CLAIMED_POLL_INTERVAL: Duration = Duration::from_millis(10);
34
35impl StreamType for File {
36    type Config = FileConfig;
37    type Events = EventBus;
38    type Source = FileSource;
39
40    async fn create(config: Self::Config) -> Result<Self::Source, StreamSourceError> {
41        let cancel = config.cancel.clone().unwrap_or_default();
42        let src = config.src.clone();
43
44        match src {
45            FileSrc::Local(path) => {
46                Self::create_local(path, config, &cancel).map_err(StreamSourceError::from)
47            }
48            FileSrc::Remote(url) => Self::create_remote_wait_for_claim(url, config, cancel).await,
49        }
50    }
51
52    fn event_bus(config: &Self::Config) -> Option<Self::Events> {
53        config.bus.clone()
54    }
55}
56
57impl File {
58    /// Create a source for a local file.
59    fn create_local(
60        path: PathBuf,
61        config: FileConfig,
62        cancel: &CancellationToken,
63    ) -> Result<FileSource, SourceError> {
64        if !path.exists() {
65            return Err(SourceError::InvalidPath(format!(
66                "file not found: {}",
67                path.display()
68            )));
69        }
70
71        let store = Arc::new(
72            AssetStoreBuilder::new()
73                .asset_root(None)
74                .cancel(cancel.clone())
75                .build(),
76        );
77
78        let key = ResourceKey::absolute(path);
79        let res = store.open_resource(&key).map_err(SourceError::Assets)?;
80        let len = res.len();
81
82        let bus = config
83            .bus
84            .unwrap_or_else(|| EventBus::new(config.event_channel_capacity));
85
86        let timeline = Timeline::new();
87        let coord = Arc::new(FileCoord::new(timeline));
88        coord.set_total_bytes(len);
89        let total = len.unwrap_or(0);
90        coord.set_download_pos(total);
91
92        let cached_codec = sniff_codec(&res);
93
94        Ok(FileSource::local(
95            res,
96            coord,
97            bus,
98            store,
99            key,
100            cancel.child_token(),
101            cached_codec,
102        ))
103    }
104
105    /// Create a source for a remote file.
106    ///
107    /// Registers the source with the [`Downloader`] and returns immediately.
108    /// Content-Length and Content-Type are discovered asynchronously via the
109    /// `on_connect` callback when the HTTP response arrives. Until then,
110    /// `len()` returns `None`.
111    fn create_remote(
112        url: url::Url,
113        config: FileConfig,
114        cancel: CancellationToken,
115    ) -> Result<FileSource, SourceError> {
116        let from_config = config.name.as_deref();
117        let from_query = url.query();
118        let name_or_query = from_config.or(from_query).filter(|s| !s.is_empty());
119        let asset_root = asset_root_for_url(&url, name_or_query);
120
121        let downloader = config.downloader.clone().unwrap_or_else(|| {
122            let cancel_for_dl = cancel.child_token();
123            let client = kithara_net::HttpClient::new(
124                kithara_net::NetOptions::default(),
125                cancel_for_dl.child_token(),
126            );
127            Downloader::new(
128                DownloaderConfig::for_client(client)
129                    .cancel(cancel_for_dl)
130                    .build(),
131            )
132        });
133
134        let backend_builder = AssetStoreBuilder::new()
135            .root_dir(&config.store.cache_dir)
136            .asset_root(Some(asset_root.as_str()))
137            .evict_config(EvictConfig::from(&config.store))
138            .ephemeral(config.store.is_ephemeral)
139            .cancel(cancel.clone());
140        let backend = Arc::new(backend_builder.build());
141
142        let state = FileStreamState::create(
143            &backend,
144            &url,
145            config.bus.clone(),
146            config.event_channel_capacity,
147        )?;
148
149        let timeline = Timeline::new();
150        let coord = Arc::new(FileCoord::new(timeline));
151        coord.set_total_bytes(state.res.len());
152
153        if matches!(state.res.status(), ResourceStatus::Committed { .. }) {
154            tracing::debug!("file already cached, skipping download");
155            let total = coord.total_bytes().unwrap_or(0);
156            coord.set_download_pos(total);
157
158            let cached_codec = sniff_codec(&state.res);
159
160            return Ok(FileSource::local(
161                state.res.clone(),
162                coord,
163                state.bus.clone(),
164                Arc::clone(&state.backend),
165                state.key.clone(),
166                cancel.child_token(),
167                cached_codec,
168            ));
169        }
170
171        let inner = Arc::new(FileInner::new(
172            FileSourceCtx {
173                cancel,
174                coord: Arc::clone(&coord),
175                bus: state.bus.clone(),
176            },
177            FileAssetCtx {
178                url,
179                headers: config.headers,
180                backend: Arc::clone(&state.backend),
181                res: state.res.clone(),
182                key: state.key.clone(),
183            },
184            FilePhase::Init,
185        ));
186
187        let peer_handle = downloader
188            .register(Arc::new(FilePeer::new(Arc::clone(&inner))))
189            .with_bus(state.bus.clone());
190
191        let mut source = FileSource::with_inner(inner, coord);
192        source.set_peer_handle(peer_handle);
193        Ok(source)
194    }
195
196    /// Wait for a sibling `AssetStore` to release the atomic-chunked
197    /// tmp file, then open. The sibling owner signals release either by
198    /// committing (canonical appears) or by dropping without commit
199    /// (tmp disappears) — both unblock our next
200    /// `OpenOptions::create_new` call.
201    ///
202    /// Wrapped in `#[kithara::hang_watchdog]` so a stale tmp from a
203    /// crashed-out previous process (which never releases the
204    /// filesystem-level signal) surfaces as a deterministic panic
205    /// rather than an indefinite hang.
206    #[kithara::hang_watchdog]
207    async fn create_remote_wait_for_claim(
208        url: url::Url,
209        config: FileConfig,
210        cancel: CancellationToken,
211    ) -> Result<FileSource, StreamSourceError> {
212        loop {
213            match Self::create_remote(url.clone(), config.clone(), cancel.clone()) {
214                Ok(src) => {
215                    hang_reset!();
216                    return Ok(src);
217                }
218                Err(SourceError::Assets(AssetsError::Storage(StorageError::TmpClaimed(_)))) => {
219                    hang_tick!();
220                    tokio::time::sleep(TMP_CLAIMED_POLL_INTERVAL).await;
221                }
222                Err(e) => return Err(StreamSourceError::from(e)),
223            }
224        }
225    }
226}
227
228/// Probe the first bytes of a committed `AssetResource` and try to
229/// classify the codec by magic prefix. Returns `None` when the read
230/// itself fails or the prefix doesn't match a known signature — callers
231/// must treat both as "no hint available" and fall back to the regular
232/// probe path.
233fn sniff_codec(res: &AssetResource) -> Option<AudioCodec> {
234    let mut buf = [0u8; 16];
235    let read = res.read_at(0, &mut buf).ok()?;
236    AudioCodec::try_from(&buf[..read]).ok()
237}