Skip to main content

gibblox_casync_std/
lib.rs

1use async_trait::async_trait;
2use flate2::read::GzDecoder;
3use gibblox_casync::{CasyncChunkId, CasyncChunkStore, CasyncIndexSource};
4use gibblox_core::{GibbloxError, GibbloxErrorKind, GibbloxResult, ReadContext};
5use reqwest::{Client, StatusCode};
6use ruzstd::decoding::StreamingDecoder;
7use std::{
8    collections::BTreeMap,
9    io::{Cursor, Read},
10    path::PathBuf,
11    sync::Arc,
12    time::Instant,
13};
14use tokio::sync::{Mutex, Notify};
15use tracing::{trace, warn};
16use url::Url;
17
18const COMPRESSED_SUFFIX_DEFAULT: &str = ".cacnk";
19const CHUNK_SIZE_LIMIT_MIN: usize = 1;
20const CHUNK_SIZE_LIMIT_MAX: usize = 128 * 1024 * 1024;
21
22const XZ_SIGNATURE: &[u8] = &[0xfd, b'7', b'z', b'X', b'Z', 0x00];
23const GZIP_SIGNATURE: &[u8] = &[0x1f, 0x8b];
24const ZSTD_SIGNATURE: &[u8] = &[0x28, 0xb5, 0x2f, 0xfd];
25
26#[derive(Clone, Debug)]
27pub enum StdCasyncIndexLocator {
28    Url(Url),
29    Path(PathBuf),
30}
31
32impl StdCasyncIndexLocator {
33    pub fn path(path: impl Into<PathBuf>) -> Self {
34        Self::Path(path.into())
35    }
36
37    pub fn url(url: Url) -> Self {
38        Self::Url(url)
39    }
40}
41
42pub struct StdCasyncIndexSource {
43    locator: StdCasyncIndexLocator,
44    client: Client,
45}
46
47impl StdCasyncIndexSource {
48    pub fn new(locator: StdCasyncIndexLocator) -> GibbloxResult<Self> {
49        Ok(Self {
50            locator,
51            client: build_http_client()?,
52        })
53    }
54
55    async fn load_from_url(&self, url: &Url) -> GibbloxResult<Vec<u8>> {
56        if url.scheme() == "file" {
57            let path = url.to_file_path().map_err(|_| {
58                GibbloxError::with_message(
59                    GibbloxErrorKind::InvalidInput,
60                    format!("index file URL is not a valid path: {url}"),
61                )
62            })?;
63            return tokio::fs::read(path)
64                .await
65                .map_err(|err| io_err("read index file", err));
66        }
67
68        let response = self
69            .client
70            .get(url.as_str())
71            .send()
72            .await
73            .map_err(|err| http_err("GET index", err))?;
74
75        if !response.status().is_success() {
76            return Err(GibbloxError::with_message(
77                GibbloxErrorKind::Io,
78                format!(
79                    "GET index failed with HTTP status {}: {url}",
80                    response.status()
81                ),
82            ));
83        }
84
85        response
86            .bytes()
87            .await
88            .map(|bytes| bytes.to_vec())
89            .map_err(|err| http_err("read index body", err))
90    }
91}
92
93#[async_trait]
94impl CasyncIndexSource for StdCasyncIndexSource {
95    async fn load_index_bytes(&self) -> GibbloxResult<Vec<u8>> {
96        trace!(locator = ?self.locator, "loading casync index");
97        match &self.locator {
98            StdCasyncIndexLocator::Path(path) => tokio::fs::read(path)
99                .await
100                .map_err(|err| io_err("read index path", err)),
101            StdCasyncIndexLocator::Url(url) => self.load_from_url(url).await,
102        }
103    }
104}
105
106#[derive(Clone, Debug)]
107pub enum StdCasyncChunkStoreLocator {
108    UrlPrefix(Url),
109    PathPrefix(PathBuf),
110}
111
112impl StdCasyncChunkStoreLocator {
113    pub fn url_prefix(url: Url) -> GibbloxResult<Self> {
114        Ok(Self::UrlPrefix(normalize_url_prefix(url)?))
115    }
116
117    pub fn path_prefix(path: impl Into<PathBuf>) -> Self {
118        Self::PathPrefix(path.into())
119    }
120}
121
122#[derive(Clone, Debug)]
123pub struct StdCasyncChunkStoreConfig {
124    pub locator: StdCasyncChunkStoreLocator,
125    pub cache_dir: Option<PathBuf>,
126    pub offline: bool,
127    pub compressed_suffix: String,
128}
129
130impl StdCasyncChunkStoreConfig {
131    pub fn new(locator: StdCasyncChunkStoreLocator) -> Self {
132        Self {
133            locator,
134            cache_dir: None,
135            offline: false,
136            compressed_suffix: COMPRESSED_SUFFIX_DEFAULT.to_string(),
137        }
138    }
139}
140
141pub struct StdCasyncChunkStore {
142    locator: StdCasyncChunkStoreLocator,
143    cache_dir: Option<PathBuf>,
144    offline: bool,
145    compressed_suffix: String,
146    client: Client,
147    in_flight: Mutex<BTreeMap<CasyncChunkId, Arc<Notify>>>,
148}
149
150impl StdCasyncChunkStore {
151    pub fn new(config: StdCasyncChunkStoreConfig) -> GibbloxResult<Self> {
152        if config.compressed_suffix.is_empty() || !config.compressed_suffix.starts_with('.') {
153            return Err(GibbloxError::with_message(
154                GibbloxErrorKind::InvalidInput,
155                "compressed_suffix must start with '.'",
156            ));
157        }
158        Ok(Self {
159            locator: config.locator,
160            cache_dir: config.cache_dir,
161            offline: config.offline,
162            compressed_suffix: config.compressed_suffix,
163            client: build_http_client()?,
164            in_flight: Mutex::new(BTreeMap::new()),
165        })
166    }
167
168    fn cache_path_for_chunk(&self, id: &CasyncChunkId) -> Option<PathBuf> {
169        self.cache_dir
170            .as_ref()
171            .map(|dir| dir.join(id.chunk_store_path(".raw")))
172    }
173
174    async fn load_from_cache(&self, id: &CasyncChunkId) -> GibbloxResult<Option<Vec<u8>>> {
175        let Some(path) = self.cache_path_for_chunk(id) else {
176            return Ok(None);
177        };
178
179        match tokio::fs::read(&path).await {
180            Ok(bytes) => {
181                trace!(chunk = %id, bytes = bytes.len(), path = %path.display(), "chunk cache hit");
182                Ok(Some(bytes))
183            }
184            Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
185            Err(err) => Err(io_err("read chunk cache", err)),
186        }
187    }
188
189    async fn write_to_cache(&self, id: &CasyncChunkId, payload: &[u8]) {
190        let Some(path) = self.cache_path_for_chunk(id) else {
191            return;
192        };
193
194        if let Some(parent) = path.parent() {
195            if let Err(err) = tokio::fs::create_dir_all(parent).await {
196                warn!(
197                    chunk = %id,
198                    path = %parent.display(),
199                    error = %err,
200                    "failed to create chunk cache directory"
201                );
202                return;
203            }
204        }
205
206        if let Err(err) = tokio::fs::write(&path, payload).await {
207            warn!(
208                chunk = %id,
209                path = %path.display(),
210                error = %err,
211                "failed to persist chunk in cache"
212            );
213        }
214    }
215
216    async fn load_from_source_locator(&self, relative: &str) -> GibbloxResult<Option<Vec<u8>>> {
217        match &self.locator {
218            StdCasyncChunkStoreLocator::PathPrefix(prefix) => {
219                let path = prefix.join(relative);
220                match tokio::fs::read(&path).await {
221                    Ok(bytes) => Ok(Some(bytes)),
222                    Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
223                    Err(err) => Err(io_err("read chunk path", err)),
224                }
225            }
226            StdCasyncChunkStoreLocator::UrlPrefix(base) => {
227                let url = chunk_url(base, relative)?;
228                let response = self
229                    .client
230                    .get(url.as_str())
231                    .send()
232                    .await
233                    .map_err(|err| http_err("GET chunk", err))?;
234
235                if response.status() == StatusCode::NOT_FOUND {
236                    return Ok(None);
237                }
238                if !response.status().is_success() {
239                    return Err(GibbloxError::with_message(
240                        GibbloxErrorKind::Io,
241                        format!(
242                            "GET chunk failed with HTTP status {}: {url}",
243                            response.status()
244                        ),
245                    ));
246                }
247
248                response
249                    .bytes()
250                    .await
251                    .map(|bytes| Some(bytes.to_vec()))
252                    .map_err(|err| http_err("read chunk body", err))
253            }
254        }
255    }
256
257    async fn fetch_chunk_payload(
258        &self,
259        id: &CasyncChunkId,
260        ctx: ReadContext,
261    ) -> GibbloxResult<Vec<u8>> {
262        let compressed_relative = id.chunk_store_path(&self.compressed_suffix);
263        let raw_relative = id.chunk_store_path("");
264
265        let fetch_start = Instant::now();
266
267        let (encoded, source_kind, encoding) =
268            if let Some(bytes) = self.load_from_source_locator(&compressed_relative).await? {
269                (bytes, compressed_relative, ChunkEncoding::Compressed)
270            } else if let Some(bytes) = self.load_from_source_locator(&raw_relative).await? {
271                (bytes, raw_relative, ChunkEncoding::Raw)
272            } else {
273                return Err(GibbloxError::with_message(
274                    GibbloxErrorKind::Io,
275                    format!("chunk not found in source: {id}"),
276                ));
277            };
278
279        trace!(
280            chunk = %id,
281            priority = ?ctx.priority,
282            source = %source_kind,
283            fetch_ms = fetch_start.elapsed().as_millis() as u64,
284            encoded_bytes = encoded.len(),
285            "fetched chunk payload"
286        );
287
288        let decoded = decode_chunk_payload(&encoded, encoding)?;
289        validate_chunk_bounds(decoded.len())?;
290        self.write_to_cache(id, &decoded).await;
291        Ok(decoded)
292    }
293
294    async fn load_chunk_inner(
295        &self,
296        id: &CasyncChunkId,
297        ctx: ReadContext,
298    ) -> GibbloxResult<Vec<u8>> {
299        loop {
300            if let Some(hit) = self.load_from_cache(id).await? {
301                return Ok(hit);
302            }
303
304            trace!(chunk = %id, "chunk cache miss");
305            if self.offline {
306                return Err(GibbloxError::with_message(
307                    GibbloxErrorKind::Io,
308                    format!("offline mode and chunk is not cached: {id}"),
309                ));
310            }
311
312            let waiter = {
313                let mut guard = self.in_flight.lock().await;
314                if let Some(notify) = guard.get(id) {
315                    Some(Arc::clone(notify))
316                } else {
317                    guard.insert(*id, Arc::new(Notify::new()));
318                    None
319                }
320            };
321
322            if let Some(waiter) = waiter {
323                waiter.notified().await;
324                continue;
325            }
326
327            let result = self.fetch_chunk_payload(id, ctx).await;
328            if let Some(notify) = self.in_flight.lock().await.remove(id) {
329                notify.notify_waiters();
330            }
331            return result;
332        }
333    }
334}
335
336#[async_trait]
337impl CasyncChunkStore for StdCasyncChunkStore {
338    async fn load_chunk(&self, id: &CasyncChunkId, ctx: ReadContext) -> GibbloxResult<Vec<u8>> {
339        self.load_chunk_inner(id, ctx).await
340    }
341}
342
343fn normalize_url_prefix(url: Url) -> GibbloxResult<Url> {
344    if url.cannot_be_a_base() {
345        return Err(GibbloxError::with_message(
346            GibbloxErrorKind::InvalidInput,
347            format!("chunk store URL cannot be a base: {url}"),
348        ));
349    }
350
351    let mut value = url.as_str().to_owned();
352    if !value.ends_with('/') {
353        value.push('/');
354    }
355    Url::parse(&value).map_err(|err| {
356        GibbloxError::with_message(
357            GibbloxErrorKind::InvalidInput,
358            format!("normalize chunk store URL: {err}"),
359        )
360    })
361}
362
363fn chunk_url(base: &Url, relative: &str) -> GibbloxResult<Url> {
364    base.join(relative).map_err(|err| {
365        GibbloxError::with_message(
366            GibbloxErrorKind::InvalidInput,
367            format!("join chunk URL for {relative}: {err}"),
368        )
369    })
370}
371
372fn build_http_client() -> GibbloxResult<Client> {
373    Client::builder()
374        .connect_timeout(std::time::Duration::from_secs(3))
375        .timeout(std::time::Duration::from_secs(8))
376        .build()
377        .map_err(|err| {
378            GibbloxError::with_message(GibbloxErrorKind::Io, format!("build HTTP client: {err}"))
379        })
380}
381
382fn detect_compression(payload: &[u8]) -> CompressionKind {
383    if payload.starts_with(XZ_SIGNATURE) {
384        return CompressionKind::Xz;
385    }
386    if payload.starts_with(GZIP_SIGNATURE) {
387        return CompressionKind::Gzip;
388    }
389    if payload.starts_with(ZSTD_SIGNATURE) {
390        return CompressionKind::Zstd;
391    }
392    CompressionKind::Raw
393}
394
395fn decode_chunk_payload(encoded: &[u8], encoding: ChunkEncoding) -> GibbloxResult<Vec<u8>> {
396    validate_chunk_bounds(encoded.len())?;
397
398    match encoding {
399        ChunkEncoding::Raw => Ok(encoded.to_vec()),
400        ChunkEncoding::Compressed => decode_compressed_chunk_payload(encoded),
401    }
402}
403
404fn decode_compressed_chunk_payload(encoded: &[u8]) -> GibbloxResult<Vec<u8>> {
405    match detect_compression(encoded) {
406        CompressionKind::Raw => Ok(encoded.to_vec()),
407        CompressionKind::Gzip => decode_gzip(encoded),
408        CompressionKind::Zstd => decode_zstd(encoded),
409        CompressionKind::Xz => decode_xz(encoded),
410    }
411}
412
413fn decode_gzip(encoded: &[u8]) -> GibbloxResult<Vec<u8>> {
414    let mut decoder = GzDecoder::new(encoded);
415    let mut out = Vec::new();
416    decoder.read_to_end(&mut out).map_err(|err| {
417        GibbloxError::with_message(GibbloxErrorKind::Io, format!("decode gzip chunk: {err}"))
418    })?;
419    Ok(out)
420}
421
422fn decode_zstd(encoded: &[u8]) -> GibbloxResult<Vec<u8>> {
423    let mut cursor = Cursor::new(encoded);
424    let mut decoder = StreamingDecoder::new(&mut cursor).map_err(|err| {
425        GibbloxError::with_message(GibbloxErrorKind::Io, format!("init zstd decoder: {err}"))
426    })?;
427    let mut out = Vec::new();
428    decoder.read_to_end(&mut out).map_err(|err| {
429        GibbloxError::with_message(GibbloxErrorKind::Io, format!("decode zstd chunk: {err}"))
430    })?;
431    Ok(out)
432}
433
434fn decode_xz(encoded: &[u8]) -> GibbloxResult<Vec<u8>> {
435    let mut cursor = Cursor::new(encoded);
436    let mut out = Vec::new();
437    lzma_rs::xz_decompress(&mut cursor, &mut out).map_err(|err| {
438        GibbloxError::with_message(GibbloxErrorKind::Io, format!("decode xz chunk: {err}"))
439    })?;
440    Ok(out)
441}
442
443fn validate_chunk_bounds(size: usize) -> GibbloxResult<()> {
444    if !(CHUNK_SIZE_LIMIT_MIN..=CHUNK_SIZE_LIMIT_MAX).contains(&size) {
445        return Err(GibbloxError::with_message(
446            GibbloxErrorKind::InvalidInput,
447            format!("chunk size is out of bounds: {size}"),
448        ));
449    }
450    Ok(())
451}
452
453fn io_err(op: &str, err: std::io::Error) -> GibbloxError {
454    GibbloxError::with_message(GibbloxErrorKind::Io, format!("{op}: {err}"))
455}
456
457fn http_err(op: &str, err: reqwest::Error) -> GibbloxError {
458    GibbloxError::with_message(GibbloxErrorKind::Io, format!("{op}: {err}"))
459}
460
461#[derive(Clone, Copy)]
462enum ChunkEncoding {
463    Raw,
464    Compressed,
465}
466
467#[derive(Clone, Copy)]
468enum CompressionKind {
469    Raw,
470    Gzip,
471    Zstd,
472    Xz,
473}
474
475#[cfg(test)]
476mod tests {
477    use super::{
478        ChunkEncoding, StdCasyncChunkStore, StdCasyncChunkStoreConfig, StdCasyncChunkStoreLocator,
479        decode_chunk_payload,
480    };
481    use flate2::{Compression, write::GzEncoder};
482    use gibblox_casync::{CasyncChunkId, CasyncChunkStore};
483    use gibblox_core::ReadContext;
484    use sha2::{Digest, Sha256};
485    use std::{io::Write, path::Path};
486
487    #[tokio::test]
488    async fn path_source_populates_and_uses_cache() {
489        let src = tempfile::tempdir().expect("src tempdir");
490        let cache = tempfile::tempdir().expect("cache tempdir");
491
492        let payload = b"chunk-payload".to_vec();
493        let id = chunk_id_for(&payload);
494        write_raw_chunk(src.path(), &id, &payload);
495
496        let mut config = StdCasyncChunkStoreConfig::new(StdCasyncChunkStoreLocator::path_prefix(
497            src.path().to_path_buf(),
498        ));
499        config.cache_dir = Some(cache.path().to_path_buf());
500        let store = StdCasyncChunkStore::new(config).expect("build chunk store");
501
502        let first = store
503            .load_chunk(&id, ReadContext::FOREGROUND)
504            .await
505            .expect("first load");
506        assert_eq!(first, payload);
507
508        std::fs::remove_file(src.path().join(id.chunk_store_path("")))
509            .expect("remove source chunk");
510        let second = store
511            .load_chunk(&id, ReadContext::FOREGROUND)
512            .await
513            .expect("second load from cache");
514        assert_eq!(second, payload);
515    }
516
517    #[tokio::test]
518    async fn offline_mode_fails_on_uncached_chunk() {
519        let src = tempfile::tempdir().expect("src tempdir");
520        let mut config = StdCasyncChunkStoreConfig::new(StdCasyncChunkStoreLocator::path_prefix(
521            src.path().to_path_buf(),
522        ));
523        config.offline = true;
524
525        let store = StdCasyncChunkStore::new(config).expect("build chunk store");
526        let id = CasyncChunkId::from_bytes([0xab; 32]);
527
528        let err = store
529            .load_chunk(&id, ReadContext::FOREGROUND)
530            .await
531            .expect_err("offline miss should fail");
532        assert_eq!(err.kind(), gibblox_core::GibbloxErrorKind::Io);
533    }
534
535    #[tokio::test]
536    async fn compressed_cacnk_is_decoded_before_return() {
537        let src = tempfile::tempdir().expect("src tempdir");
538        let cache = tempfile::tempdir().expect("cache tempdir");
539
540        let payload = b"hello compressed chunk".to_vec();
541        let id = chunk_id_for(&payload);
542        write_gzip_chunk(src.path(), &id, &payload);
543
544        let mut config = StdCasyncChunkStoreConfig::new(StdCasyncChunkStoreLocator::path_prefix(
545            src.path().to_path_buf(),
546        ));
547        config.cache_dir = Some(cache.path().to_path_buf());
548        let store = StdCasyncChunkStore::new(config).expect("build chunk store");
549
550        let loaded = store
551            .load_chunk(&id, ReadContext::FOREGROUND)
552            .await
553            .expect("load compressed chunk");
554        assert_eq!(loaded, payload);
555
556        let decoded =
557            decode_chunk_payload(&read_gzip_chunk(src.path(), &id), ChunkEncoding::Compressed);
558        assert_eq!(decoded.expect("decode helper"), payload);
559    }
560
561    #[tokio::test]
562    async fn raw_chunk_with_gzip_magic_is_not_decoded_as_compressed() {
563        let src = tempfile::tempdir().expect("src tempdir");
564        let cache = tempfile::tempdir().expect("cache tempdir");
565
566        let mut payload = b"pretend raw chunk".to_vec();
567        payload[0] = 0x1f;
568        payload[1] = 0x8b;
569        let id = chunk_id_for(&payload);
570        write_raw_chunk(src.path(), &id, &payload);
571
572        let mut config = StdCasyncChunkStoreConfig::new(StdCasyncChunkStoreLocator::path_prefix(
573            src.path().to_path_buf(),
574        ));
575        config.cache_dir = Some(cache.path().to_path_buf());
576        let store = StdCasyncChunkStore::new(config).expect("build chunk store");
577
578        let loaded = store
579            .load_chunk(&id, ReadContext::FOREGROUND)
580            .await
581            .expect("load raw chunk with gzip magic");
582        assert_eq!(loaded, payload);
583
584        let decoded = decode_chunk_payload(&payload, ChunkEncoding::Raw);
585        assert_eq!(decoded.expect("decode helper"), payload);
586    }
587
588    fn chunk_id_for(payload: &[u8]) -> CasyncChunkId {
589        let digest = Sha256::digest(payload);
590        let mut bytes = [0u8; 32];
591        bytes.copy_from_slice(&digest);
592        CasyncChunkId::from_bytes(bytes)
593    }
594
595    fn write_raw_chunk(base: &Path, id: &CasyncChunkId, payload: &[u8]) {
596        let relative = id.chunk_store_path("");
597        let path = base.join(relative);
598        if let Some(parent) = path.parent() {
599            std::fs::create_dir_all(parent).expect("create chunk dir");
600        }
601        std::fs::write(path, payload).expect("write raw chunk");
602    }
603
604    fn write_gzip_chunk(base: &Path, id: &CasyncChunkId, payload: &[u8]) {
605        let relative = id.chunk_store_path(".cacnk");
606        let path = base.join(relative);
607        if let Some(parent) = path.parent() {
608            std::fs::create_dir_all(parent).expect("create chunk dir");
609        }
610
611        let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
612        encoder.write_all(payload).expect("encode payload");
613        let encoded = encoder.finish().expect("finish gzip payload");
614        std::fs::write(path, encoded).expect("write gzip chunk");
615    }
616
617    fn read_gzip_chunk(base: &Path, id: &CasyncChunkId) -> Vec<u8> {
618        std::fs::read(base.join(id.chunk_store_path(".cacnk"))).expect("read gzip chunk")
619    }
620}