1use async_trait::async_trait;
2use flate2::read::GzDecoder;
3use gibblox_casync::{CasyncChunkId, CasyncChunkStore, CasyncIndexSource};
4use gibblox_core::{GibbloxError, GibbloxErrorKind, GibbloxResult, ReadContext};
5use reqwest::{Client, StatusCode};
6use ruzstd::decoding::StreamingDecoder;
7use std::{
8 collections::BTreeMap,
9 io::{Cursor, Read},
10 path::PathBuf,
11 sync::Arc,
12 time::Instant,
13};
14use tokio::sync::{Mutex, Notify};
15use tracing::{trace, warn};
16use url::Url;
17
18const COMPRESSED_SUFFIX_DEFAULT: &str = ".cacnk";
19const CHUNK_SIZE_LIMIT_MIN: usize = 1;
20const CHUNK_SIZE_LIMIT_MAX: usize = 128 * 1024 * 1024;
21
22const XZ_SIGNATURE: &[u8] = &[0xfd, b'7', b'z', b'X', b'Z', 0x00];
23const GZIP_SIGNATURE: &[u8] = &[0x1f, 0x8b];
24const ZSTD_SIGNATURE: &[u8] = &[0x28, 0xb5, 0x2f, 0xfd];
25
26#[derive(Clone, Debug)]
27pub enum StdCasyncIndexLocator {
28 Url(Url),
29 Path(PathBuf),
30}
31
32impl StdCasyncIndexLocator {
33 pub fn path(path: impl Into<PathBuf>) -> Self {
34 Self::Path(path.into())
35 }
36
37 pub fn url(url: Url) -> Self {
38 Self::Url(url)
39 }
40}
41
42pub struct StdCasyncIndexSource {
43 locator: StdCasyncIndexLocator,
44 client: Client,
45}
46
47impl StdCasyncIndexSource {
48 pub fn new(locator: StdCasyncIndexLocator) -> GibbloxResult<Self> {
49 Ok(Self {
50 locator,
51 client: build_http_client()?,
52 })
53 }
54
55 async fn load_from_url(&self, url: &Url) -> GibbloxResult<Vec<u8>> {
56 if url.scheme() == "file" {
57 let path = url.to_file_path().map_err(|_| {
58 GibbloxError::with_message(
59 GibbloxErrorKind::InvalidInput,
60 format!("index file URL is not a valid path: {url}"),
61 )
62 })?;
63 return tokio::fs::read(path)
64 .await
65 .map_err(|err| io_err("read index file", err));
66 }
67
68 let response = self
69 .client
70 .get(url.as_str())
71 .send()
72 .await
73 .map_err(|err| http_err("GET index", err))?;
74
75 if !response.status().is_success() {
76 return Err(GibbloxError::with_message(
77 GibbloxErrorKind::Io,
78 format!(
79 "GET index failed with HTTP status {}: {url}",
80 response.status()
81 ),
82 ));
83 }
84
85 response
86 .bytes()
87 .await
88 .map(|bytes| bytes.to_vec())
89 .map_err(|err| http_err("read index body", err))
90 }
91}
92
93#[async_trait]
94impl CasyncIndexSource for StdCasyncIndexSource {
95 async fn load_index_bytes(&self) -> GibbloxResult<Vec<u8>> {
96 trace!(locator = ?self.locator, "loading casync index");
97 match &self.locator {
98 StdCasyncIndexLocator::Path(path) => tokio::fs::read(path)
99 .await
100 .map_err(|err| io_err("read index path", err)),
101 StdCasyncIndexLocator::Url(url) => self.load_from_url(url).await,
102 }
103 }
104}
105
106#[derive(Clone, Debug)]
107pub enum StdCasyncChunkStoreLocator {
108 UrlPrefix(Url),
109 PathPrefix(PathBuf),
110}
111
112impl StdCasyncChunkStoreLocator {
113 pub fn url_prefix(url: Url) -> GibbloxResult<Self> {
114 Ok(Self::UrlPrefix(normalize_url_prefix(url)?))
115 }
116
117 pub fn path_prefix(path: impl Into<PathBuf>) -> Self {
118 Self::PathPrefix(path.into())
119 }
120}
121
122#[derive(Clone, Debug)]
123pub struct StdCasyncChunkStoreConfig {
124 pub locator: StdCasyncChunkStoreLocator,
125 pub cache_dir: Option<PathBuf>,
126 pub offline: bool,
127 pub compressed_suffix: String,
128}
129
130impl StdCasyncChunkStoreConfig {
131 pub fn new(locator: StdCasyncChunkStoreLocator) -> Self {
132 Self {
133 locator,
134 cache_dir: None,
135 offline: false,
136 compressed_suffix: COMPRESSED_SUFFIX_DEFAULT.to_string(),
137 }
138 }
139}
140
141pub struct StdCasyncChunkStore {
142 locator: StdCasyncChunkStoreLocator,
143 cache_dir: Option<PathBuf>,
144 offline: bool,
145 compressed_suffix: String,
146 client: Client,
147 in_flight: Mutex<BTreeMap<CasyncChunkId, Arc<Notify>>>,
148}
149
150impl StdCasyncChunkStore {
151 pub fn new(config: StdCasyncChunkStoreConfig) -> GibbloxResult<Self> {
152 if config.compressed_suffix.is_empty() || !config.compressed_suffix.starts_with('.') {
153 return Err(GibbloxError::with_message(
154 GibbloxErrorKind::InvalidInput,
155 "compressed_suffix must start with '.'",
156 ));
157 }
158 Ok(Self {
159 locator: config.locator,
160 cache_dir: config.cache_dir,
161 offline: config.offline,
162 compressed_suffix: config.compressed_suffix,
163 client: build_http_client()?,
164 in_flight: Mutex::new(BTreeMap::new()),
165 })
166 }
167
168 fn cache_path_for_chunk(&self, id: &CasyncChunkId) -> Option<PathBuf> {
169 self.cache_dir
170 .as_ref()
171 .map(|dir| dir.join(id.chunk_store_path(".raw")))
172 }
173
174 async fn load_from_cache(&self, id: &CasyncChunkId) -> GibbloxResult<Option<Vec<u8>>> {
175 let Some(path) = self.cache_path_for_chunk(id) else {
176 return Ok(None);
177 };
178
179 match tokio::fs::read(&path).await {
180 Ok(bytes) => {
181 trace!(chunk = %id, bytes = bytes.len(), path = %path.display(), "chunk cache hit");
182 Ok(Some(bytes))
183 }
184 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
185 Err(err) => Err(io_err("read chunk cache", err)),
186 }
187 }
188
189 async fn write_to_cache(&self, id: &CasyncChunkId, payload: &[u8]) {
190 let Some(path) = self.cache_path_for_chunk(id) else {
191 return;
192 };
193
194 if let Some(parent) = path.parent() {
195 if let Err(err) = tokio::fs::create_dir_all(parent).await {
196 warn!(
197 chunk = %id,
198 path = %parent.display(),
199 error = %err,
200 "failed to create chunk cache directory"
201 );
202 return;
203 }
204 }
205
206 if let Err(err) = tokio::fs::write(&path, payload).await {
207 warn!(
208 chunk = %id,
209 path = %path.display(),
210 error = %err,
211 "failed to persist chunk in cache"
212 );
213 }
214 }
215
216 async fn load_from_source_locator(&self, relative: &str) -> GibbloxResult<Option<Vec<u8>>> {
217 match &self.locator {
218 StdCasyncChunkStoreLocator::PathPrefix(prefix) => {
219 let path = prefix.join(relative);
220 match tokio::fs::read(&path).await {
221 Ok(bytes) => Ok(Some(bytes)),
222 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
223 Err(err) => Err(io_err("read chunk path", err)),
224 }
225 }
226 StdCasyncChunkStoreLocator::UrlPrefix(base) => {
227 let url = chunk_url(base, relative)?;
228 let response = self
229 .client
230 .get(url.as_str())
231 .send()
232 .await
233 .map_err(|err| http_err("GET chunk", err))?;
234
235 if response.status() == StatusCode::NOT_FOUND {
236 return Ok(None);
237 }
238 if !response.status().is_success() {
239 return Err(GibbloxError::with_message(
240 GibbloxErrorKind::Io,
241 format!(
242 "GET chunk failed with HTTP status {}: {url}",
243 response.status()
244 ),
245 ));
246 }
247
248 response
249 .bytes()
250 .await
251 .map(|bytes| Some(bytes.to_vec()))
252 .map_err(|err| http_err("read chunk body", err))
253 }
254 }
255 }
256
257 async fn fetch_chunk_payload(
258 &self,
259 id: &CasyncChunkId,
260 ctx: ReadContext,
261 ) -> GibbloxResult<Vec<u8>> {
262 let compressed_relative = id.chunk_store_path(&self.compressed_suffix);
263 let raw_relative = id.chunk_store_path("");
264
265 let fetch_start = Instant::now();
266
267 let (encoded, source_kind, encoding) =
268 if let Some(bytes) = self.load_from_source_locator(&compressed_relative).await? {
269 (bytes, compressed_relative, ChunkEncoding::Compressed)
270 } else if let Some(bytes) = self.load_from_source_locator(&raw_relative).await? {
271 (bytes, raw_relative, ChunkEncoding::Raw)
272 } else {
273 return Err(GibbloxError::with_message(
274 GibbloxErrorKind::Io,
275 format!("chunk not found in source: {id}"),
276 ));
277 };
278
279 trace!(
280 chunk = %id,
281 priority = ?ctx.priority,
282 source = %source_kind,
283 fetch_ms = fetch_start.elapsed().as_millis() as u64,
284 encoded_bytes = encoded.len(),
285 "fetched chunk payload"
286 );
287
288 let decoded = decode_chunk_payload(&encoded, encoding)?;
289 validate_chunk_bounds(decoded.len())?;
290 self.write_to_cache(id, &decoded).await;
291 Ok(decoded)
292 }
293
294 async fn load_chunk_inner(
295 &self,
296 id: &CasyncChunkId,
297 ctx: ReadContext,
298 ) -> GibbloxResult<Vec<u8>> {
299 loop {
300 if let Some(hit) = self.load_from_cache(id).await? {
301 return Ok(hit);
302 }
303
304 trace!(chunk = %id, "chunk cache miss");
305 if self.offline {
306 return Err(GibbloxError::with_message(
307 GibbloxErrorKind::Io,
308 format!("offline mode and chunk is not cached: {id}"),
309 ));
310 }
311
312 let waiter = {
313 let mut guard = self.in_flight.lock().await;
314 if let Some(notify) = guard.get(id) {
315 Some(Arc::clone(notify))
316 } else {
317 guard.insert(*id, Arc::new(Notify::new()));
318 None
319 }
320 };
321
322 if let Some(waiter) = waiter {
323 waiter.notified().await;
324 continue;
325 }
326
327 let result = self.fetch_chunk_payload(id, ctx).await;
328 if let Some(notify) = self.in_flight.lock().await.remove(id) {
329 notify.notify_waiters();
330 }
331 return result;
332 }
333 }
334}
335
336#[async_trait]
337impl CasyncChunkStore for StdCasyncChunkStore {
338 async fn load_chunk(&self, id: &CasyncChunkId, ctx: ReadContext) -> GibbloxResult<Vec<u8>> {
339 self.load_chunk_inner(id, ctx).await
340 }
341}
342
343fn normalize_url_prefix(url: Url) -> GibbloxResult<Url> {
344 if url.cannot_be_a_base() {
345 return Err(GibbloxError::with_message(
346 GibbloxErrorKind::InvalidInput,
347 format!("chunk store URL cannot be a base: {url}"),
348 ));
349 }
350
351 let mut value = url.as_str().to_owned();
352 if !value.ends_with('/') {
353 value.push('/');
354 }
355 Url::parse(&value).map_err(|err| {
356 GibbloxError::with_message(
357 GibbloxErrorKind::InvalidInput,
358 format!("normalize chunk store URL: {err}"),
359 )
360 })
361}
362
363fn chunk_url(base: &Url, relative: &str) -> GibbloxResult<Url> {
364 base.join(relative).map_err(|err| {
365 GibbloxError::with_message(
366 GibbloxErrorKind::InvalidInput,
367 format!("join chunk URL for {relative}: {err}"),
368 )
369 })
370}
371
372fn build_http_client() -> GibbloxResult<Client> {
373 Client::builder()
374 .connect_timeout(std::time::Duration::from_secs(3))
375 .timeout(std::time::Duration::from_secs(8))
376 .build()
377 .map_err(|err| {
378 GibbloxError::with_message(GibbloxErrorKind::Io, format!("build HTTP client: {err}"))
379 })
380}
381
382fn detect_compression(payload: &[u8]) -> CompressionKind {
383 if payload.starts_with(XZ_SIGNATURE) {
384 return CompressionKind::Xz;
385 }
386 if payload.starts_with(GZIP_SIGNATURE) {
387 return CompressionKind::Gzip;
388 }
389 if payload.starts_with(ZSTD_SIGNATURE) {
390 return CompressionKind::Zstd;
391 }
392 CompressionKind::Raw
393}
394
395fn decode_chunk_payload(encoded: &[u8], encoding: ChunkEncoding) -> GibbloxResult<Vec<u8>> {
396 validate_chunk_bounds(encoded.len())?;
397
398 match encoding {
399 ChunkEncoding::Raw => Ok(encoded.to_vec()),
400 ChunkEncoding::Compressed => decode_compressed_chunk_payload(encoded),
401 }
402}
403
404fn decode_compressed_chunk_payload(encoded: &[u8]) -> GibbloxResult<Vec<u8>> {
405 match detect_compression(encoded) {
406 CompressionKind::Raw => Ok(encoded.to_vec()),
407 CompressionKind::Gzip => decode_gzip(encoded),
408 CompressionKind::Zstd => decode_zstd(encoded),
409 CompressionKind::Xz => decode_xz(encoded),
410 }
411}
412
413fn decode_gzip(encoded: &[u8]) -> GibbloxResult<Vec<u8>> {
414 let mut decoder = GzDecoder::new(encoded);
415 let mut out = Vec::new();
416 decoder.read_to_end(&mut out).map_err(|err| {
417 GibbloxError::with_message(GibbloxErrorKind::Io, format!("decode gzip chunk: {err}"))
418 })?;
419 Ok(out)
420}
421
422fn decode_zstd(encoded: &[u8]) -> GibbloxResult<Vec<u8>> {
423 let mut cursor = Cursor::new(encoded);
424 let mut decoder = StreamingDecoder::new(&mut cursor).map_err(|err| {
425 GibbloxError::with_message(GibbloxErrorKind::Io, format!("init zstd decoder: {err}"))
426 })?;
427 let mut out = Vec::new();
428 decoder.read_to_end(&mut out).map_err(|err| {
429 GibbloxError::with_message(GibbloxErrorKind::Io, format!("decode zstd chunk: {err}"))
430 })?;
431 Ok(out)
432}
433
434fn decode_xz(encoded: &[u8]) -> GibbloxResult<Vec<u8>> {
435 let mut cursor = Cursor::new(encoded);
436 let mut out = Vec::new();
437 lzma_rs::xz_decompress(&mut cursor, &mut out).map_err(|err| {
438 GibbloxError::with_message(GibbloxErrorKind::Io, format!("decode xz chunk: {err}"))
439 })?;
440 Ok(out)
441}
442
443fn validate_chunk_bounds(size: usize) -> GibbloxResult<()> {
444 if !(CHUNK_SIZE_LIMIT_MIN..=CHUNK_SIZE_LIMIT_MAX).contains(&size) {
445 return Err(GibbloxError::with_message(
446 GibbloxErrorKind::InvalidInput,
447 format!("chunk size is out of bounds: {size}"),
448 ));
449 }
450 Ok(())
451}
452
453fn io_err(op: &str, err: std::io::Error) -> GibbloxError {
454 GibbloxError::with_message(GibbloxErrorKind::Io, format!("{op}: {err}"))
455}
456
457fn http_err(op: &str, err: reqwest::Error) -> GibbloxError {
458 GibbloxError::with_message(GibbloxErrorKind::Io, format!("{op}: {err}"))
459}
460
461#[derive(Clone, Copy)]
462enum ChunkEncoding {
463 Raw,
464 Compressed,
465}
466
467#[derive(Clone, Copy)]
468enum CompressionKind {
469 Raw,
470 Gzip,
471 Zstd,
472 Xz,
473}
474
475#[cfg(test)]
476mod tests {
477 use super::{
478 ChunkEncoding, StdCasyncChunkStore, StdCasyncChunkStoreConfig, StdCasyncChunkStoreLocator,
479 decode_chunk_payload,
480 };
481 use flate2::{Compression, write::GzEncoder};
482 use gibblox_casync::{CasyncChunkId, CasyncChunkStore};
483 use gibblox_core::ReadContext;
484 use sha2::{Digest, Sha256};
485 use std::{io::Write, path::Path};
486
487 #[tokio::test]
488 async fn path_source_populates_and_uses_cache() {
489 let src = tempfile::tempdir().expect("src tempdir");
490 let cache = tempfile::tempdir().expect("cache tempdir");
491
492 let payload = b"chunk-payload".to_vec();
493 let id = chunk_id_for(&payload);
494 write_raw_chunk(src.path(), &id, &payload);
495
496 let mut config = StdCasyncChunkStoreConfig::new(StdCasyncChunkStoreLocator::path_prefix(
497 src.path().to_path_buf(),
498 ));
499 config.cache_dir = Some(cache.path().to_path_buf());
500 let store = StdCasyncChunkStore::new(config).expect("build chunk store");
501
502 let first = store
503 .load_chunk(&id, ReadContext::FOREGROUND)
504 .await
505 .expect("first load");
506 assert_eq!(first, payload);
507
508 std::fs::remove_file(src.path().join(id.chunk_store_path("")))
509 .expect("remove source chunk");
510 let second = store
511 .load_chunk(&id, ReadContext::FOREGROUND)
512 .await
513 .expect("second load from cache");
514 assert_eq!(second, payload);
515 }
516
517 #[tokio::test]
518 async fn offline_mode_fails_on_uncached_chunk() {
519 let src = tempfile::tempdir().expect("src tempdir");
520 let mut config = StdCasyncChunkStoreConfig::new(StdCasyncChunkStoreLocator::path_prefix(
521 src.path().to_path_buf(),
522 ));
523 config.offline = true;
524
525 let store = StdCasyncChunkStore::new(config).expect("build chunk store");
526 let id = CasyncChunkId::from_bytes([0xab; 32]);
527
528 let err = store
529 .load_chunk(&id, ReadContext::FOREGROUND)
530 .await
531 .expect_err("offline miss should fail");
532 assert_eq!(err.kind(), gibblox_core::GibbloxErrorKind::Io);
533 }
534
535 #[tokio::test]
536 async fn compressed_cacnk_is_decoded_before_return() {
537 let src = tempfile::tempdir().expect("src tempdir");
538 let cache = tempfile::tempdir().expect("cache tempdir");
539
540 let payload = b"hello compressed chunk".to_vec();
541 let id = chunk_id_for(&payload);
542 write_gzip_chunk(src.path(), &id, &payload);
543
544 let mut config = StdCasyncChunkStoreConfig::new(StdCasyncChunkStoreLocator::path_prefix(
545 src.path().to_path_buf(),
546 ));
547 config.cache_dir = Some(cache.path().to_path_buf());
548 let store = StdCasyncChunkStore::new(config).expect("build chunk store");
549
550 let loaded = store
551 .load_chunk(&id, ReadContext::FOREGROUND)
552 .await
553 .expect("load compressed chunk");
554 assert_eq!(loaded, payload);
555
556 let decoded =
557 decode_chunk_payload(&read_gzip_chunk(src.path(), &id), ChunkEncoding::Compressed);
558 assert_eq!(decoded.expect("decode helper"), payload);
559 }
560
561 #[tokio::test]
562 async fn raw_chunk_with_gzip_magic_is_not_decoded_as_compressed() {
563 let src = tempfile::tempdir().expect("src tempdir");
564 let cache = tempfile::tempdir().expect("cache tempdir");
565
566 let mut payload = b"pretend raw chunk".to_vec();
567 payload[0] = 0x1f;
568 payload[1] = 0x8b;
569 let id = chunk_id_for(&payload);
570 write_raw_chunk(src.path(), &id, &payload);
571
572 let mut config = StdCasyncChunkStoreConfig::new(StdCasyncChunkStoreLocator::path_prefix(
573 src.path().to_path_buf(),
574 ));
575 config.cache_dir = Some(cache.path().to_path_buf());
576 let store = StdCasyncChunkStore::new(config).expect("build chunk store");
577
578 let loaded = store
579 .load_chunk(&id, ReadContext::FOREGROUND)
580 .await
581 .expect("load raw chunk with gzip magic");
582 assert_eq!(loaded, payload);
583
584 let decoded = decode_chunk_payload(&payload, ChunkEncoding::Raw);
585 assert_eq!(decoded.expect("decode helper"), payload);
586 }
587
588 fn chunk_id_for(payload: &[u8]) -> CasyncChunkId {
589 let digest = Sha256::digest(payload);
590 let mut bytes = [0u8; 32];
591 bytes.copy_from_slice(&digest);
592 CasyncChunkId::from_bytes(bytes)
593 }
594
595 fn write_raw_chunk(base: &Path, id: &CasyncChunkId, payload: &[u8]) {
596 let relative = id.chunk_store_path("");
597 let path = base.join(relative);
598 if let Some(parent) = path.parent() {
599 std::fs::create_dir_all(parent).expect("create chunk dir");
600 }
601 std::fs::write(path, payload).expect("write raw chunk");
602 }
603
604 fn write_gzip_chunk(base: &Path, id: &CasyncChunkId, payload: &[u8]) {
605 let relative = id.chunk_store_path(".cacnk");
606 let path = base.join(relative);
607 if let Some(parent) = path.parent() {
608 std::fs::create_dir_all(parent).expect("create chunk dir");
609 }
610
611 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
612 encoder.write_all(payload).expect("encode payload");
613 let encoded = encoder.finish().expect("finish gzip payload");
614 std::fs::write(path, encoded).expect("write gzip chunk");
615 }
616
617 fn read_gzip_chunk(base: &Path, id: &CasyncChunkId) -> Vec<u8> {
618 std::fs::read(base.join(id.chunk_store_path(".cacnk"))).expect("read gzip chunk")
619 }
620}