Skip to main content

icechunk/
asset_manager.rs

1use async_stream::try_stream;
2use bytes::Bytes;
3use chrono::{DateTime, Utc};
4use futures::{Stream, TryStreamExt};
5use quick_cache::{Weighter, sync::Cache};
6use serde::{Deserialize, Serialize};
7use std::{
8    io::{BufReader, Read},
9    ops::Range,
10    sync::{Arc, atomic::AtomicBool},
11};
12use tokio::sync::Semaphore;
13use tracing::{Span, debug, instrument, trace, warn};
14
15use crate::{
16    Storage,
17    config::CachingConfig,
18    format::{
19        ChunkId, ChunkOffset, IcechunkFormatErrorKind, ManifestId, SnapshotId,
20        format_constants::{self, CompressionAlgorithmBin, FileTypeBin, SpecVersionBin},
21        manifest::Manifest,
22        serializers::{
23            deserialize_manifest, deserialize_snapshot, deserialize_transaction_log,
24            serialize_manifest, serialize_snapshot, serialize_transaction_log,
25        },
26        snapshot::{Snapshot, SnapshotInfo},
27        transaction_log::TransactionLog,
28    },
29    private,
30    repository::{RepositoryError, RepositoryErrorKind, RepositoryResult},
31    storage::{self, Reader},
32};
33
34#[derive(Debug, Serialize, Deserialize)]
35#[serde(from = "AssetManagerSerializer")]
36pub struct AssetManager {
37    storage: Arc<dyn Storage + Send + Sync>,
38    storage_settings: storage::Settings,
39    num_snapshot_nodes: u64,
40    num_chunk_refs: u64,
41    num_transaction_changes: u64,
42    num_bytes_attributes: u64,
43    num_bytes_chunks: u64,
44    compression_level: u8,
45
46    max_concurrent_requests: u16,
47
48    #[serde(skip)]
49    manifest_cache_size_warned: AtomicBool,
50    #[serde(skip)]
51    snapshot_cache_size_warned: AtomicBool,
52
53    #[serde(skip)]
54    snapshot_cache: Cache<SnapshotId, Arc<Snapshot>, FileWeighter>,
55    #[serde(skip)]
56    manifest_cache: Cache<ManifestId, Arc<Manifest>, FileWeighter>,
57    #[serde(skip)]
58    transactions_cache: Cache<SnapshotId, Arc<TransactionLog>, FileWeighter>,
59    #[serde(skip)]
60    chunk_cache: Cache<(ChunkId, Range<ChunkOffset>), Bytes, FileWeighter>,
61
62    #[serde(skip)]
63    request_semaphore: Semaphore,
64}
65
66impl private::Sealed for AssetManager {}
67
68#[derive(Debug, Deserialize)]
69struct AssetManagerSerializer {
70    storage: Arc<dyn Storage + Send + Sync>,
71    storage_settings: storage::Settings,
72    num_snapshot_nodes: u64,
73    num_chunk_refs: u64,
74    num_transaction_changes: u64,
75    num_bytes_attributes: u64,
76    num_bytes_chunks: u64,
77    compression_level: u8,
78    max_concurrent_requests: u16,
79}
80
81impl From<AssetManagerSerializer> for AssetManager {
82    fn from(value: AssetManagerSerializer) -> Self {
83        AssetManager::new(
84            value.storage,
85            value.storage_settings,
86            value.num_snapshot_nodes,
87            value.num_chunk_refs,
88            value.num_transaction_changes,
89            value.num_bytes_attributes,
90            value.num_bytes_chunks,
91            value.compression_level,
92            value.max_concurrent_requests,
93        )
94    }
95}
96
97impl AssetManager {
98    #[allow(clippy::too_many_arguments)]
99    pub fn new(
100        storage: Arc<dyn Storage + Send + Sync>,
101        storage_settings: storage::Settings,
102        num_snapshot_nodes: u64,
103        num_chunk_refs: u64,
104        num_transaction_changes: u64,
105        num_bytes_attributes: u64,
106        num_bytes_chunks: u64,
107        compression_level: u8,
108        max_concurrent_requests: u16,
109    ) -> Self {
110        Self {
111            num_snapshot_nodes,
112            num_chunk_refs,
113            num_transaction_changes,
114            num_bytes_attributes,
115            num_bytes_chunks,
116            compression_level,
117            max_concurrent_requests,
118            storage,
119            storage_settings,
120            snapshot_cache: Cache::with_weighter(1, num_snapshot_nodes, FileWeighter),
121            manifest_cache: Cache::with_weighter(1, num_chunk_refs, FileWeighter),
122            transactions_cache: Cache::with_weighter(
123                0,
124                num_transaction_changes,
125                FileWeighter,
126            ),
127            chunk_cache: Cache::with_weighter(0, num_bytes_chunks, FileWeighter),
128            snapshot_cache_size_warned: AtomicBool::new(false),
129            manifest_cache_size_warned: AtomicBool::new(false),
130            request_semaphore: Semaphore::new(max_concurrent_requests as usize),
131        }
132    }
133
134    pub fn new_no_cache(
135        storage: Arc<dyn Storage + Send + Sync>,
136        storage_settings: storage::Settings,
137        compression_level: u8,
138        max_concurrent_requests: u16,
139    ) -> Self {
140        Self::new(
141            storage,
142            storage_settings,
143            0,
144            0,
145            0,
146            0,
147            0,
148            compression_level,
149            max_concurrent_requests,
150        )
151    }
152
153    pub fn new_with_config(
154        storage: Arc<dyn Storage + Send + Sync>,
155        storage_settings: storage::Settings,
156        config: &CachingConfig,
157        compression_level: u8,
158        max_concurrent_requests: u16,
159    ) -> Self {
160        Self::new(
161            storage,
162            storage_settings,
163            config.num_snapshot_nodes(),
164            config.num_chunk_refs(),
165            config.num_transaction_changes(),
166            config.num_bytes_attributes(),
167            config.num_bytes_chunks(),
168            compression_level,
169            max_concurrent_requests,
170        )
171    }
172
173    pub fn remove_cached_snapshot(&self, snapshot_id: &SnapshotId) {
174        self.snapshot_cache.remove(snapshot_id);
175    }
176
177    pub fn remove_cached_manifest(&self, manifest_id: &ManifestId) {
178        self.manifest_cache.remove(manifest_id);
179    }
180
181    pub fn remove_cached_tx_log(&self, snapshot_id: &SnapshotId) {
182        self.transactions_cache.remove(snapshot_id);
183    }
184
185    pub fn clear_chunk_cache(&self) {
186        self.chunk_cache.clear();
187    }
188
189    #[instrument(skip(self, manifest))]
190    pub async fn write_manifest(&self, manifest: Arc<Manifest>) -> RepositoryResult<u64> {
191        let manifest_c = Arc::clone(&manifest);
192        let res = write_new_manifest(
193            manifest_c,
194            self.compression_level,
195            self.storage.as_ref(),
196            &self.storage_settings,
197            &self.request_semaphore,
198        )
199        .await?;
200        self.warn_if_manifest_cache_small(manifest.as_ref());
201        self.manifest_cache.insert(manifest.id().clone(), manifest);
202        Ok(res)
203    }
204
205    #[instrument(skip(self))]
206    pub async fn fetch_manifest(
207        &self,
208        manifest_id: &ManifestId,
209        manifest_size: u64,
210    ) -> RepositoryResult<Arc<Manifest>> {
211        match self.manifest_cache.get_value_or_guard_async(manifest_id).await {
212            Ok(manifest) => Ok(manifest),
213            Err(guard) => {
214                let manifest = fetch_manifest(
215                    manifest_id,
216                    manifest_size,
217                    self.storage.as_ref(),
218                    &self.storage_settings,
219                    &self.request_semaphore,
220                )
221                .await?;
222                self.warn_if_manifest_cache_small(manifest.as_ref());
223                let _fail_is_ok = guard.insert(Arc::clone(&manifest));
224                Ok(manifest)
225            }
226        }
227    }
228
229    fn warn_if_manifest_cache_small(&self, manifest: &Manifest) {
230        let manifest_weight = manifest.len();
231        let capacity = self.num_chunk_refs;
232        // TODO: we may need a config to silence this warning
233        if manifest_weight as u64 > capacity / 2
234            && !self.manifest_cache_size_warned.load(std::sync::atomic::Ordering::Relaxed)
235        {
236            warn!(
237                "A manifest with {manifest_weight} chunk references is being loaded into the cache that can only keep {capacity} references. Consider increasing the size of the manifest cache using the num_chunk_refs field in CachingConfig"
238            );
239            self.manifest_cache_size_warned
240                .store(true, std::sync::atomic::Ordering::Relaxed);
241        }
242    }
243
244    fn warn_if_snapshot_cache_small(&self, snap: &Snapshot) {
245        let snap_weight = snap.len();
246        let capacity = self.num_snapshot_nodes;
247        // TODO: we may need a config to silence this warning
248        if snap_weight as u64 > capacity / 5
249            && !self.snapshot_cache_size_warned.load(std::sync::atomic::Ordering::Relaxed)
250        {
251            warn!(
252                "A snapshot with {snap_weight} nodes is being loaded into the cache that can only keep {capacity} nodes. Consider increasing the size of the snapshot cache using the num_snapshot_nodes field in CachingConfig"
253            );
254            self.snapshot_cache_size_warned
255                .store(true, std::sync::atomic::Ordering::Relaxed);
256        }
257    }
258
259    #[instrument(skip(self,))]
260    pub async fn fetch_manifest_unknown_size(
261        &self,
262        manifest_id: &ManifestId,
263    ) -> RepositoryResult<Arc<Manifest>> {
264        self.fetch_manifest(manifest_id, 0).await
265    }
266
267    #[instrument(skip(self, snapshot))]
268    pub async fn write_snapshot(&self, snapshot: Arc<Snapshot>) -> RepositoryResult<()> {
269        let snapshot_c = Arc::clone(&snapshot);
270        write_new_snapshot(
271            snapshot_c,
272            self.compression_level,
273            self.storage.as_ref(),
274            &self.storage_settings,
275            &self.request_semaphore,
276        )
277        .await?;
278        let snapshot_id = snapshot.id().clone();
279        self.warn_if_snapshot_cache_small(snapshot.as_ref());
280        // This line is critical for expiration:
281        // When we edit snapshots in place, we need the cache to return the new version
282        self.snapshot_cache.insert(snapshot_id, snapshot);
283        Ok(())
284    }
285
286    #[instrument(skip(self))]
287    pub async fn fetch_snapshot(
288        &self,
289        snapshot_id: &SnapshotId,
290    ) -> RepositoryResult<Arc<Snapshot>> {
291        match self.snapshot_cache.get_value_or_guard_async(snapshot_id).await {
292            Ok(snapshot) => Ok(snapshot),
293            Err(guard) => {
294                let snapshot = fetch_snapshot(
295                    snapshot_id,
296                    self.storage.as_ref(),
297                    &self.storage_settings,
298                    &self.request_semaphore,
299                )
300                .await?;
301                self.warn_if_snapshot_cache_small(snapshot.as_ref());
302                let _fail_is_ok = guard.insert(Arc::clone(&snapshot));
303                Ok(snapshot)
304            }
305        }
306    }
307
308    #[instrument(skip(self, log))]
309    pub async fn write_transaction_log(
310        &self,
311        transaction_id: SnapshotId,
312        log: Arc<TransactionLog>,
313    ) -> RepositoryResult<()> {
314        let log_c = Arc::clone(&log);
315        write_new_tx_log(
316            transaction_id.clone(),
317            log_c,
318            self.compression_level,
319            self.storage.as_ref(),
320            &self.storage_settings,
321            &self.request_semaphore,
322        )
323        .await?;
324        self.transactions_cache.insert(transaction_id, log);
325        Ok(())
326    }
327
328    #[instrument(skip(self))]
329    pub async fn fetch_transaction_log(
330        &self,
331        transaction_id: &SnapshotId,
332    ) -> RepositoryResult<Arc<TransactionLog>> {
333        match self.transactions_cache.get_value_or_guard_async(transaction_id).await {
334            Ok(transaction) => Ok(transaction),
335            Err(guard) => {
336                let transaction = fetch_transaction_log(
337                    transaction_id,
338                    self.storage.as_ref(),
339                    &self.storage_settings,
340                    &self.request_semaphore,
341                )
342                .await?;
343                let _fail_is_ok = guard.insert(Arc::clone(&transaction));
344                Ok(transaction)
345            }
346        }
347    }
348
349    #[instrument(skip(self, bytes))]
350    pub async fn write_chunk(
351        &self,
352        chunk_id: ChunkId,
353        bytes: Bytes,
354    ) -> RepositoryResult<()> {
355        trace!(%chunk_id, size_bytes=bytes.len(), "Writing chunk");
356        let _permit = self.request_semaphore.acquire().await?;
357        // we don't pre-populate the chunk cache, there are too many of them for this to be useful
358        Ok(self.storage.write_chunk(&self.storage_settings, chunk_id, bytes).await?)
359    }
360
361    #[instrument(skip(self))]
362    pub async fn fetch_chunk(
363        &self,
364        chunk_id: &ChunkId,
365        range: &Range<ChunkOffset>,
366    ) -> RepositoryResult<Bytes> {
367        let key = (chunk_id.clone(), range.clone());
368        match self.chunk_cache.get_value_or_guard_async(&key).await {
369            Ok(chunk) => Ok(chunk),
370            Err(guard) => {
371                trace!(%chunk_id, ?range, "Downloading chunk");
372                let permit = self.request_semaphore.acquire().await?;
373                let chunk = self
374                    .storage
375                    .fetch_chunk(&self.storage_settings, chunk_id, range)
376                    .await?;
377                drop(permit);
378                let _fail_is_ok = guard.insert(chunk.clone());
379                Ok(chunk)
380            }
381        }
382    }
383
384    /// Returns the sequence of parents of the current session, in order of latest first.
385    /// Output stream includes snapshot_id argument
386    #[instrument(skip(self))]
387    pub async fn snapshot_info_ancestry(
388        self: Arc<Self>,
389        snapshot_id: &SnapshotId,
390    ) -> RepositoryResult<impl Stream<Item = RepositoryResult<SnapshotInfo>> + use<>>
391    {
392        let res =
393            self.snapshot_ancestry(snapshot_id).await?.and_then(|snap| async move {
394                let info = snap.as_ref().try_into()?;
395                Ok(info)
396            });
397        Ok(res)
398    }
399
400    /// Returns the sequence of parents of the current session, in order of latest first.
401    /// Output stream includes snapshot_id argument
402    #[instrument(skip(self))]
403    pub async fn snapshot_ancestry(
404        self: Arc<Self>,
405        snapshot_id: &SnapshotId,
406    ) -> RepositoryResult<impl Stream<Item = RepositoryResult<Arc<Snapshot>>> + use<>>
407    {
408        let mut this = self.fetch_snapshot(snapshot_id).await?;
409        let stream = try_stream! {
410            yield Arc::clone(&this);
411            while let Some(parent) = this.parent_id() {
412                let snap = self.fetch_snapshot(&parent).await?;
413                yield Arc::clone(&snap);
414                this = snap;
415            }
416        };
417        Ok(stream)
418    }
419
420    #[instrument(skip(self))]
421    pub async fn get_snapshot_last_modified(
422        &self,
423        snapshot_id: &SnapshotId,
424    ) -> RepositoryResult<DateTime<Utc>> {
425        debug!(%snapshot_id, "Getting snapshot timestamp");
426        let _permit = self.request_semaphore.acquire().await?;
427        Ok(self
428            .storage
429            .get_snapshot_last_modified(&self.storage_settings, snapshot_id)
430            .await?)
431    }
432
433    #[instrument(skip(self))]
434    pub async fn fetch_snapshot_info(
435        &self,
436        snapshot_id: &SnapshotId,
437    ) -> RepositoryResult<SnapshotInfo> {
438        let snapshot = self.fetch_snapshot(snapshot_id).await?;
439        let info = snapshot.as_ref().try_into()?;
440        Ok(info)
441    }
442}
443
444fn binary_file_header(
445    spec_version: SpecVersionBin,
446    file_type: FileTypeBin,
447    compression_algorithm: CompressionAlgorithmBin,
448) -> Vec<u8> {
449    use format_constants::*;
450    // TODO: initialize capacity
451    let mut buffer = Vec::with_capacity(1024);
452    // magic numbers
453    buffer.extend_from_slice(ICECHUNK_FORMAT_MAGIC_BYTES);
454    // implementation name
455    let implementation = format!("{:<24}", &*ICECHUNK_CLIENT_NAME);
456    buffer.extend_from_slice(&implementation.as_bytes()[..24]);
457    // spec version
458    buffer.push(spec_version as u8);
459    buffer.push(file_type as u8);
460    // compression
461    buffer.push(compression_algorithm as u8);
462    buffer
463}
464
465fn check_header(
466    read: &mut (dyn Read + Unpin + Send),
467    file_type: FileTypeBin,
468) -> RepositoryResult<(SpecVersionBin, CompressionAlgorithmBin)> {
469    let mut buf = [0; 12];
470    read.read_exact(&mut buf)?;
471    // Magic numbers
472    if format_constants::ICECHUNK_FORMAT_MAGIC_BYTES != buf {
473        return Err(RepositoryErrorKind::FormatError(
474            IcechunkFormatErrorKind::InvalidMagicNumbers,
475        )
476        .into());
477    }
478
479    let mut buf = [0; 24];
480    // ignore implementation name
481    read.read_exact(&mut buf)?;
482
483    let mut spec_version = 0;
484    read.read_exact(std::slice::from_mut(&mut spec_version))?;
485
486    let spec_version = spec_version.try_into().map_err(|_| {
487        RepositoryErrorKind::FormatError(IcechunkFormatErrorKind::InvalidSpecVersion)
488    })?;
489
490    let mut actual_file_type_int = 0;
491    read.read_exact(std::slice::from_mut(&mut actual_file_type_int))?;
492
493    let actual_file_type: FileTypeBin =
494        actual_file_type_int.try_into().map_err(|_| {
495            RepositoryErrorKind::FormatError(IcechunkFormatErrorKind::InvalidFileType {
496                expected: file_type,
497                got: actual_file_type_int,
498            })
499        })?;
500
501    if actual_file_type != file_type {
502        return Err(RepositoryErrorKind::FormatError(
503            IcechunkFormatErrorKind::InvalidFileType {
504                expected: file_type,
505                got: actual_file_type_int,
506            },
507        )
508        .into());
509    }
510
511    let mut compression = 0;
512    read.read_exact(std::slice::from_mut(&mut compression))?;
513
514    let compression = compression.try_into().map_err(|_| {
515        RepositoryErrorKind::FormatError(
516            IcechunkFormatErrorKind::InvalidCompressionAlgorithm,
517        )
518    })?;
519
520    Ok((spec_version, compression))
521}
522
523async fn write_new_manifest(
524    new_manifest: Arc<Manifest>,
525    compression_level: u8,
526    storage: &(dyn Storage + Send + Sync),
527    storage_settings: &storage::Settings,
528    semaphore: &Semaphore,
529) -> RepositoryResult<u64> {
530    use format_constants::*;
531    let metadata = vec![
532        (
533            LATEST_ICECHUNK_FORMAT_VERSION_METADATA_KEY.to_string(),
534            (SpecVersionBin::current() as u8).to_string(),
535        ),
536        (ICECHUNK_CLIENT_NAME_METADATA_KEY.to_string(), ICECHUNK_CLIENT_NAME.to_string()),
537        (
538            ICECHUNK_FILE_TYPE_METADATA_KEY.to_string(),
539            ICECHUNK_FILE_TYPE_MANIFEST.to_string(),
540        ),
541        (
542            ICECHUNK_COMPRESSION_METADATA_KEY.to_string(),
543            ICECHUNK_COMPRESSION_ZSTD.to_string(),
544        ),
545    ];
546
547    let id = new_manifest.id().clone();
548
549    let span = Span::current();
550    // TODO: we should compress only when the manifest reaches a certain size
551    // but then, we would need to include metadata to know if it's compressed or not
552    let buffer = tokio::task::spawn_blocking(move || {
553        let _entered = span.entered();
554        let buffer = binary_file_header(
555            SpecVersionBin::current(),
556            FileTypeBin::Manifest,
557            CompressionAlgorithmBin::Zstd,
558        );
559        let mut compressor =
560            zstd::stream::Encoder::new(buffer, compression_level as i32)?;
561
562        serialize_manifest(
563            new_manifest.as_ref(),
564            SpecVersionBin::current(),
565            &mut compressor,
566        )?;
567
568        compressor.finish().map_err(RepositoryErrorKind::IOError)
569    })
570    .await??;
571
572    let len = buffer.len() as u64;
573    debug!(%id, size_bytes=len, "Writing manifest");
574    let _permit = semaphore.acquire().await?;
575    storage.write_manifest(storage_settings, id.clone(), metadata, buffer.into()).await?;
576    Ok(len)
577}
578
579async fn fetch_manifest(
580    manifest_id: &ManifestId,
581    manifest_size: u64,
582    storage: &(dyn Storage + Send + Sync),
583    storage_settings: &storage::Settings,
584    semaphore: &Semaphore,
585) -> RepositoryResult<Arc<Manifest>> {
586    debug!(%manifest_id, "Downloading manifest");
587
588    let _permit = semaphore.acquire().await?;
589    let reader = if manifest_size > 0 {
590        storage
591            .fetch_manifest_known_size(storage_settings, manifest_id, manifest_size)
592            .await?
593    } else {
594        Reader::Asynchronous(
595            storage.fetch_manifest_unknown_size(storage_settings, manifest_id).await?,
596        )
597    };
598
599    let span = Span::current();
600    tokio::task::spawn_blocking(move || {
601        let _entered = span.entered();
602        let (spec_version, decompressor) =
603            check_and_get_decompressor(reader, FileTypeBin::Manifest)?;
604        deserialize_manifest(spec_version, decompressor).map_err(RepositoryError::from)
605    })
606    .await?
607    .map(Arc::new)
608}
609
610fn check_and_get_decompressor(
611    data: Reader,
612    file_type: FileTypeBin,
613) -> RepositoryResult<(SpecVersionBin, Box<dyn Read + Send>)> {
614    let mut sync_read = data.into_read();
615    let (spec_version, compression) = check_header(sync_read.as_mut(), file_type)?;
616    debug_assert_eq!(compression, CompressionAlgorithmBin::Zstd);
617    // We find a performance impact if we don't buffer here
618    let decompressor =
619        BufReader::with_capacity(1_024, zstd::stream::Decoder::new(sync_read)?);
620    Ok((spec_version, Box::new(decompressor)))
621}
622
623async fn write_new_snapshot(
624    new_snapshot: Arc<Snapshot>,
625    compression_level: u8,
626    storage: &(dyn Storage + Send + Sync),
627    storage_settings: &storage::Settings,
628    semaphore: &Semaphore,
629) -> RepositoryResult<SnapshotId> {
630    use format_constants::*;
631    let metadata = vec![
632        (
633            LATEST_ICECHUNK_FORMAT_VERSION_METADATA_KEY.to_string(),
634            (SpecVersionBin::current() as u8).to_string(),
635        ),
636        (ICECHUNK_CLIENT_NAME_METADATA_KEY.to_string(), ICECHUNK_CLIENT_NAME.to_string()),
637        (
638            ICECHUNK_FILE_TYPE_METADATA_KEY.to_string(),
639            ICECHUNK_FILE_TYPE_SNAPSHOT.to_string(),
640        ),
641        (
642            ICECHUNK_COMPRESSION_METADATA_KEY.to_string(),
643            ICECHUNK_COMPRESSION_ZSTD.to_string(),
644        ),
645    ];
646
647    let id = new_snapshot.id().clone();
648    let span = Span::current();
649    let buffer = tokio::task::spawn_blocking(move || {
650        let _entered = span.entered();
651        let buffer = binary_file_header(
652            SpecVersionBin::current(),
653            FileTypeBin::Snapshot,
654            CompressionAlgorithmBin::Zstd,
655        );
656        let mut compressor =
657            zstd::stream::Encoder::new(buffer, compression_level as i32)?;
658
659        serialize_snapshot(
660            new_snapshot.as_ref(),
661            SpecVersionBin::current(),
662            &mut compressor,
663        )?;
664
665        compressor.finish().map_err(RepositoryErrorKind::IOError)
666    })
667    .await??;
668
669    debug!(%id, size_bytes=buffer.len(), "Writing snapshot");
670    let _permit = semaphore.acquire().await?;
671    storage.write_snapshot(storage_settings, id.clone(), metadata, buffer.into()).await?;
672
673    Ok(id)
674}
675
676async fn fetch_snapshot(
677    snapshot_id: &SnapshotId,
678    storage: &(dyn Storage + Send + Sync),
679    storage_settings: &storage::Settings,
680    semaphore: &Semaphore,
681) -> RepositoryResult<Arc<Snapshot>> {
682    debug!(%snapshot_id, "Downloading snapshot");
683    let _permit = semaphore.acquire().await?;
684    let read = storage.fetch_snapshot(storage_settings, snapshot_id).await?;
685
686    let span = Span::current();
687    tokio::task::spawn_blocking(move || {
688        let _entered = span.entered();
689        let (spec_version, decompressor) = check_and_get_decompressor(
690            Reader::Asynchronous(read),
691            FileTypeBin::Snapshot,
692        )?;
693        deserialize_snapshot(spec_version, decompressor).map_err(RepositoryError::from)
694    })
695    .await?
696    .map(Arc::new)
697}
698
699async fn write_new_tx_log(
700    transaction_id: SnapshotId,
701    new_log: Arc<TransactionLog>,
702    compression_level: u8,
703    storage: &(dyn Storage + Send + Sync),
704    storage_settings: &storage::Settings,
705    semaphore: &Semaphore,
706) -> RepositoryResult<()> {
707    use format_constants::*;
708    let metadata = vec![
709        (
710            LATEST_ICECHUNK_FORMAT_VERSION_METADATA_KEY.to_string(),
711            (SpecVersionBin::current() as u8).to_string(),
712        ),
713        (ICECHUNK_CLIENT_NAME_METADATA_KEY.to_string(), ICECHUNK_CLIENT_NAME.to_string()),
714        (
715            ICECHUNK_FILE_TYPE_METADATA_KEY.to_string(),
716            ICECHUNK_FILE_TYPE_TRANSACTION_LOG.to_string(),
717        ),
718        (
719            ICECHUNK_COMPRESSION_METADATA_KEY.to_string(),
720            ICECHUNK_COMPRESSION_ZSTD.to_string(),
721        ),
722    ];
723
724    let span = Span::current();
725    let buffer = tokio::task::spawn_blocking(move || {
726        let _entered = span.entered();
727        let buffer = binary_file_header(
728            SpecVersionBin::current(),
729            FileTypeBin::TransactionLog,
730            CompressionAlgorithmBin::Zstd,
731        );
732        let mut compressor =
733            zstd::stream::Encoder::new(buffer, compression_level as i32)?;
734        serialize_transaction_log(
735            new_log.as_ref(),
736            SpecVersionBin::current(),
737            &mut compressor,
738        )?;
739        compressor.finish().map_err(RepositoryErrorKind::IOError)
740    })
741    .await??;
742
743    debug!(%transaction_id, size_bytes=buffer.len(), "Writing transaction log");
744    let _permit = semaphore.acquire().await?;
745    storage
746        .write_transaction_log(storage_settings, transaction_id, metadata, buffer.into())
747        .await?;
748
749    Ok(())
750}
751
752async fn fetch_transaction_log(
753    transaction_id: &SnapshotId,
754    storage: &(dyn Storage + Send + Sync),
755    storage_settings: &storage::Settings,
756    semaphore: &Semaphore,
757) -> RepositoryResult<Arc<TransactionLog>> {
758    debug!(%transaction_id, "Downloading transaction log");
759    let _permit = semaphore.acquire().await?;
760    let read = storage.fetch_transaction_log(storage_settings, transaction_id).await?;
761
762    let span = Span::current();
763    tokio::task::spawn_blocking(move || {
764        let _entered = span.entered();
765        let (spec_version, decompressor) = check_and_get_decompressor(
766            Reader::Asynchronous(read),
767            FileTypeBin::TransactionLog,
768        )?;
769        deserialize_transaction_log(spec_version, decompressor)
770            .map_err(RepositoryError::from)
771    })
772    .await?
773    .map(Arc::new)
774}
775
776#[derive(Debug, Clone)]
777struct FileWeighter;
778
779impl Weighter<ManifestId, Arc<Manifest>> for FileWeighter {
780    fn weight(&self, _: &ManifestId, val: &Arc<Manifest>) -> u64 {
781        val.len() as u64
782    }
783}
784
785impl Weighter<SnapshotId, Arc<Snapshot>> for FileWeighter {
786    fn weight(&self, _: &SnapshotId, val: &Arc<Snapshot>) -> u64 {
787        val.len() as u64
788    }
789}
790
791impl Weighter<(ChunkId, Range<ChunkOffset>), Bytes> for FileWeighter {
792    fn weight(&self, _: &(ChunkId, Range<ChunkOffset>), val: &Bytes) -> u64 {
793        val.len() as u64
794    }
795}
796
797impl Weighter<SnapshotId, Arc<TransactionLog>> for FileWeighter {
798    fn weight(&self, _: &SnapshotId, val: &Arc<TransactionLog>) -> u64 {
799        val.len() as u64
800    }
801}
802
803#[cfg(test)]
804#[allow(clippy::panic, clippy::unwrap_used, clippy::expect_used)]
805mod test {
806
807    use icechunk_macros::tokio_test;
808    use itertools::{Itertools, assert_equal};
809
810    use super::*;
811    use crate::{
812        format::{
813            ChunkIndices, NodeId,
814            manifest::{ChunkInfo, ChunkPayload},
815        },
816        storage::{Storage, logging::LoggingStorage, new_in_memory_storage},
817    };
818
819    #[tokio_test]
820    async fn test_caching_caches() -> Result<(), Box<dyn std::error::Error>> {
821        let backend: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
822        let settings = storage::Settings::default();
823        let manager =
824            AssetManager::new_no_cache(backend.clone(), settings.clone(), 1, 100);
825
826        let node1 = NodeId::random();
827        let node2 = NodeId::random();
828        let ci1 = ChunkInfo {
829            node: node1.clone(),
830            coord: ChunkIndices(vec![0]),
831            payload: ChunkPayload::Inline(Bytes::copy_from_slice(b"a")),
832        };
833        let ci2 = ChunkInfo {
834            node: node2.clone(),
835            coord: ChunkIndices(vec![1]),
836            payload: ChunkPayload::Inline(Bytes::copy_from_slice(b"b")),
837        };
838        let pre_existing_manifest =
839            Manifest::from_iter(vec![ci1].into_iter()).await?.unwrap();
840        let pre_existing_manifest = Arc::new(pre_existing_manifest);
841        let pre_existing_id = pre_existing_manifest.id();
842        let pre_size = manager.write_manifest(Arc::clone(&pre_existing_manifest)).await?;
843
844        let logging = Arc::new(LoggingStorage::new(Arc::clone(&backend)));
845        let logging_c: Arc<dyn Storage + Send + Sync> = logging.clone();
846        let caching = AssetManager::new_with_config(
847            Arc::clone(&logging_c),
848            settings,
849            &CachingConfig::default(),
850            1,
851            100,
852        );
853
854        let manifest =
855            Arc::new(Manifest::from_iter(vec![ci2.clone()].into_iter()).await?.unwrap());
856        let id = manifest.id();
857        let size = caching.write_manifest(Arc::clone(&manifest)).await?;
858
859        let fetched = caching.fetch_manifest(&id, size).await?;
860        assert_eq!(fetched.len(), 1);
861        assert_equal(
862            fetched.iter(node2.clone()).map(|x| x.unwrap()),
863            [(ci2.coord.clone(), ci2.payload.clone())],
864        );
865
866        // fetch again
867        caching.fetch_manifest(&id, size).await?;
868        // when we insert we cache, so no fetches
869        assert_eq!(logging.fetch_operations(), vec![]);
870
871        // first time it sees an ID it calls the backend
872        caching.fetch_manifest(&pre_existing_id, pre_size).await?;
873        assert_eq!(
874            logging.fetch_operations(),
875            vec![("fetch_manifest_splitting".to_string(), pre_existing_id.to_string())]
876        );
877
878        // only calls backend once
879        caching.fetch_manifest(&pre_existing_id, pre_size).await?;
880        assert_eq!(
881            logging.fetch_operations(),
882            vec![("fetch_manifest_splitting".to_string(), pre_existing_id.to_string())]
883        );
884
885        // other walues still cached
886        caching.fetch_manifest(&id, size).await?;
887        assert_eq!(
888            logging.fetch_operations(),
889            vec![("fetch_manifest_splitting".to_string(), pre_existing_id.to_string())]
890        );
891        Ok(())
892    }
893
894    #[tokio_test]
895    async fn test_caching_storage_has_limit() -> Result<(), Box<dyn std::error::Error>> {
896        let backend: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
897        let settings = storage::Settings::default();
898        let manager =
899            AssetManager::new_no_cache(backend.clone(), settings.clone(), 1, 100);
900
901        let ci1 = ChunkInfo {
902            node: NodeId::random(),
903            coord: ChunkIndices(vec![]),
904            payload: ChunkPayload::Inline(Bytes::copy_from_slice(b"a")),
905        };
906        let ci2 = ChunkInfo { node: NodeId::random(), ..ci1.clone() };
907        let ci3 = ChunkInfo { node: NodeId::random(), ..ci1.clone() };
908        let ci4 = ChunkInfo { node: NodeId::random(), ..ci1.clone() };
909        let ci5 = ChunkInfo { node: NodeId::random(), ..ci1.clone() };
910        let ci6 = ChunkInfo { node: NodeId::random(), ..ci1.clone() };
911        let ci7 = ChunkInfo { node: NodeId::random(), ..ci1.clone() };
912        let ci8 = ChunkInfo { node: NodeId::random(), ..ci1.clone() };
913        let ci9 = ChunkInfo { node: NodeId::random(), ..ci1.clone() };
914
915        let manifest1 =
916            Arc::new(Manifest::from_iter(vec![ci1, ci2, ci3]).await?.unwrap());
917        let id1 = manifest1.id();
918        let size1 = manager.write_manifest(Arc::clone(&manifest1)).await?;
919        let manifest2 =
920            Arc::new(Manifest::from_iter(vec![ci4, ci5, ci6]).await?.unwrap());
921        let id2 = manifest2.id();
922        let size2 = manager.write_manifest(Arc::clone(&manifest2)).await?;
923        let manifest3 =
924            Arc::new(Manifest::from_iter(vec![ci7, ci8, ci9]).await?.unwrap());
925        let id3 = manifest3.id();
926        let size3 = manager.write_manifest(Arc::clone(&manifest3)).await?;
927
928        let logging = Arc::new(LoggingStorage::new(Arc::clone(&backend)));
929        let logging_c: Arc<dyn Storage + Send + Sync> = logging.clone();
930        let caching = AssetManager::new_with_config(
931            logging_c,
932            settings,
933            // the cache can only fit 6 refs.
934            &CachingConfig {
935                num_snapshot_nodes: Some(0),
936                num_chunk_refs: Some(7),
937                num_transaction_changes: Some(0),
938                num_bytes_attributes: Some(0),
939                num_bytes_chunks: Some(0),
940            },
941            1,
942            100,
943        );
944
945        // we keep asking for all 3 items, but the cache can only fit 2
946        for _ in 0..20 {
947            caching.fetch_manifest(&id1, size1).await?;
948            caching.fetch_manifest(&id2, size2).await?;
949            caching.fetch_manifest(&id3, size3).await?;
950        }
951        // after the initial warming requests, we only request the file that doesn't fit in the cache
952        assert_eq!(logging.fetch_operations()[10..].iter().unique().count(), 1);
953
954        Ok(())
955    }
956
957    #[tokio_test]
958    async fn test_dont_fetch_asset_twice() -> Result<(), Box<dyn std::error::Error>> {
959        // Test that two concurrent requests for the same manifest doesn't generate two
960        // object_store requests, one of them must wait
961        let storage: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
962        let settings = storage::Settings::default();
963        let manager = Arc::new(AssetManager::new_no_cache(
964            storage.clone(),
965            settings.clone(),
966            1,
967            100,
968        ));
969
970        // some reasonable size so it takes some time to parse
971        let manifest = Manifest::from_iter((0..5_000).map(|_| ChunkInfo {
972            node: NodeId::random(),
973            coord: ChunkIndices(Vec::from([rand::random(), rand::random()])),
974            payload: ChunkPayload::Inline("hello".into()),
975        }))
976        .await
977        .unwrap()
978        .unwrap();
979        let manifest_id = manifest.id().clone();
980        let size = manager.write_manifest(Arc::new(manifest)).await?;
981
982        let logging = Arc::new(LoggingStorage::new(Arc::clone(&storage)));
983        let logging_c: Arc<dyn Storage + Send + Sync> = logging.clone();
984        let manager = Arc::new(AssetManager::new_with_config(
985            logging_c.clone(),
986            logging_c.default_settings(),
987            &CachingConfig::default(),
988            1,
989            100,
990        ));
991
992        let manager_c = Arc::new(manager);
993        let manager_cc = Arc::clone(&manager_c);
994        let manifest_id_c = manifest_id.clone();
995
996        let res1 = tokio::task::spawn(async move {
997            manager_c.fetch_manifest(&manifest_id_c, size).await
998        });
999        let res2 = tokio::task::spawn(async move {
1000            manager_cc.fetch_manifest(&manifest_id, size).await
1001        });
1002
1003        assert!(res1.await?.is_ok());
1004        assert!(res2.await?.is_ok());
1005        assert_eq!(logging.fetch_operations().len(), 1);
1006        Ok(())
1007    }
1008}