1use async_stream::try_stream;
2use bytes::Bytes;
3use chrono::{DateTime, Utc};
4use futures::{Stream, TryStreamExt};
5use quick_cache::{Weighter, sync::Cache};
6use serde::{Deserialize, Serialize};
7use std::{
8 io::{BufReader, Read},
9 ops::Range,
10 sync::{Arc, atomic::AtomicBool},
11};
12use tokio::sync::Semaphore;
13use tracing::{Span, debug, instrument, trace, warn};
14
15use crate::{
16 Storage,
17 config::CachingConfig,
18 format::{
19 ChunkId, ChunkOffset, IcechunkFormatErrorKind, ManifestId, SnapshotId,
20 format_constants::{self, CompressionAlgorithmBin, FileTypeBin, SpecVersionBin},
21 manifest::Manifest,
22 serializers::{
23 deserialize_manifest, deserialize_snapshot, deserialize_transaction_log,
24 serialize_manifest, serialize_snapshot, serialize_transaction_log,
25 },
26 snapshot::{Snapshot, SnapshotInfo},
27 transaction_log::TransactionLog,
28 },
29 private,
30 repository::{RepositoryError, RepositoryErrorKind, RepositoryResult},
31 storage::{self, Reader},
32};
33
34#[derive(Debug, Serialize, Deserialize)]
35#[serde(from = "AssetManagerSerializer")]
36pub struct AssetManager {
37 storage: Arc<dyn Storage + Send + Sync>,
38 storage_settings: storage::Settings,
39 num_snapshot_nodes: u64,
40 num_chunk_refs: u64,
41 num_transaction_changes: u64,
42 num_bytes_attributes: u64,
43 num_bytes_chunks: u64,
44 compression_level: u8,
45
46 max_concurrent_requests: u16,
47
48 #[serde(skip)]
49 manifest_cache_size_warned: AtomicBool,
50 #[serde(skip)]
51 snapshot_cache_size_warned: AtomicBool,
52
53 #[serde(skip)]
54 snapshot_cache: Cache<SnapshotId, Arc<Snapshot>, FileWeighter>,
55 #[serde(skip)]
56 manifest_cache: Cache<ManifestId, Arc<Manifest>, FileWeighter>,
57 #[serde(skip)]
58 transactions_cache: Cache<SnapshotId, Arc<TransactionLog>, FileWeighter>,
59 #[serde(skip)]
60 chunk_cache: Cache<(ChunkId, Range<ChunkOffset>), Bytes, FileWeighter>,
61
62 #[serde(skip)]
63 request_semaphore: Semaphore,
64}
65
66impl private::Sealed for AssetManager {}
67
68#[derive(Debug, Deserialize)]
69struct AssetManagerSerializer {
70 storage: Arc<dyn Storage + Send + Sync>,
71 storage_settings: storage::Settings,
72 num_snapshot_nodes: u64,
73 num_chunk_refs: u64,
74 num_transaction_changes: u64,
75 num_bytes_attributes: u64,
76 num_bytes_chunks: u64,
77 compression_level: u8,
78 max_concurrent_requests: u16,
79}
80
81impl From<AssetManagerSerializer> for AssetManager {
82 fn from(value: AssetManagerSerializer) -> Self {
83 AssetManager::new(
84 value.storage,
85 value.storage_settings,
86 value.num_snapshot_nodes,
87 value.num_chunk_refs,
88 value.num_transaction_changes,
89 value.num_bytes_attributes,
90 value.num_bytes_chunks,
91 value.compression_level,
92 value.max_concurrent_requests,
93 )
94 }
95}
96
97impl AssetManager {
98 #[allow(clippy::too_many_arguments)]
99 pub fn new(
100 storage: Arc<dyn Storage + Send + Sync>,
101 storage_settings: storage::Settings,
102 num_snapshot_nodes: u64,
103 num_chunk_refs: u64,
104 num_transaction_changes: u64,
105 num_bytes_attributes: u64,
106 num_bytes_chunks: u64,
107 compression_level: u8,
108 max_concurrent_requests: u16,
109 ) -> Self {
110 Self {
111 num_snapshot_nodes,
112 num_chunk_refs,
113 num_transaction_changes,
114 num_bytes_attributes,
115 num_bytes_chunks,
116 compression_level,
117 max_concurrent_requests,
118 storage,
119 storage_settings,
120 snapshot_cache: Cache::with_weighter(1, num_snapshot_nodes, FileWeighter),
121 manifest_cache: Cache::with_weighter(1, num_chunk_refs, FileWeighter),
122 transactions_cache: Cache::with_weighter(
123 0,
124 num_transaction_changes,
125 FileWeighter,
126 ),
127 chunk_cache: Cache::with_weighter(0, num_bytes_chunks, FileWeighter),
128 snapshot_cache_size_warned: AtomicBool::new(false),
129 manifest_cache_size_warned: AtomicBool::new(false),
130 request_semaphore: Semaphore::new(max_concurrent_requests as usize),
131 }
132 }
133
134 pub fn new_no_cache(
135 storage: Arc<dyn Storage + Send + Sync>,
136 storage_settings: storage::Settings,
137 compression_level: u8,
138 max_concurrent_requests: u16,
139 ) -> Self {
140 Self::new(
141 storage,
142 storage_settings,
143 0,
144 0,
145 0,
146 0,
147 0,
148 compression_level,
149 max_concurrent_requests,
150 )
151 }
152
153 pub fn new_with_config(
154 storage: Arc<dyn Storage + Send + Sync>,
155 storage_settings: storage::Settings,
156 config: &CachingConfig,
157 compression_level: u8,
158 max_concurrent_requests: u16,
159 ) -> Self {
160 Self::new(
161 storage,
162 storage_settings,
163 config.num_snapshot_nodes(),
164 config.num_chunk_refs(),
165 config.num_transaction_changes(),
166 config.num_bytes_attributes(),
167 config.num_bytes_chunks(),
168 compression_level,
169 max_concurrent_requests,
170 )
171 }
172
173 pub fn remove_cached_snapshot(&self, snapshot_id: &SnapshotId) {
174 self.snapshot_cache.remove(snapshot_id);
175 }
176
177 pub fn remove_cached_manifest(&self, manifest_id: &ManifestId) {
178 self.manifest_cache.remove(manifest_id);
179 }
180
181 pub fn remove_cached_tx_log(&self, snapshot_id: &SnapshotId) {
182 self.transactions_cache.remove(snapshot_id);
183 }
184
185 pub fn clear_chunk_cache(&self) {
186 self.chunk_cache.clear();
187 }
188
189 #[instrument(skip(self, manifest))]
190 pub async fn write_manifest(&self, manifest: Arc<Manifest>) -> RepositoryResult<u64> {
191 let manifest_c = Arc::clone(&manifest);
192 let res = write_new_manifest(
193 manifest_c,
194 self.compression_level,
195 self.storage.as_ref(),
196 &self.storage_settings,
197 &self.request_semaphore,
198 )
199 .await?;
200 self.warn_if_manifest_cache_small(manifest.as_ref());
201 self.manifest_cache.insert(manifest.id().clone(), manifest);
202 Ok(res)
203 }
204
205 #[instrument(skip(self))]
206 pub async fn fetch_manifest(
207 &self,
208 manifest_id: &ManifestId,
209 manifest_size: u64,
210 ) -> RepositoryResult<Arc<Manifest>> {
211 match self.manifest_cache.get_value_or_guard_async(manifest_id).await {
212 Ok(manifest) => Ok(manifest),
213 Err(guard) => {
214 let manifest = fetch_manifest(
215 manifest_id,
216 manifest_size,
217 self.storage.as_ref(),
218 &self.storage_settings,
219 &self.request_semaphore,
220 )
221 .await?;
222 self.warn_if_manifest_cache_small(manifest.as_ref());
223 let _fail_is_ok = guard.insert(Arc::clone(&manifest));
224 Ok(manifest)
225 }
226 }
227 }
228
229 fn warn_if_manifest_cache_small(&self, manifest: &Manifest) {
230 let manifest_weight = manifest.len();
231 let capacity = self.num_chunk_refs;
232 if manifest_weight as u64 > capacity / 2
234 && !self.manifest_cache_size_warned.load(std::sync::atomic::Ordering::Relaxed)
235 {
236 warn!(
237 "A manifest with {manifest_weight} chunk references is being loaded into the cache that can only keep {capacity} references. Consider increasing the size of the manifest cache using the num_chunk_refs field in CachingConfig"
238 );
239 self.manifest_cache_size_warned
240 .store(true, std::sync::atomic::Ordering::Relaxed);
241 }
242 }
243
244 fn warn_if_snapshot_cache_small(&self, snap: &Snapshot) {
245 let snap_weight = snap.len();
246 let capacity = self.num_snapshot_nodes;
247 if snap_weight as u64 > capacity / 5
249 && !self.snapshot_cache_size_warned.load(std::sync::atomic::Ordering::Relaxed)
250 {
251 warn!(
252 "A snapshot with {snap_weight} nodes is being loaded into the cache that can only keep {capacity} nodes. Consider increasing the size of the snapshot cache using the num_snapshot_nodes field in CachingConfig"
253 );
254 self.snapshot_cache_size_warned
255 .store(true, std::sync::atomic::Ordering::Relaxed);
256 }
257 }
258
259 #[instrument(skip(self,))]
260 pub async fn fetch_manifest_unknown_size(
261 &self,
262 manifest_id: &ManifestId,
263 ) -> RepositoryResult<Arc<Manifest>> {
264 self.fetch_manifest(manifest_id, 0).await
265 }
266
267 #[instrument(skip(self, snapshot))]
268 pub async fn write_snapshot(&self, snapshot: Arc<Snapshot>) -> RepositoryResult<()> {
269 let snapshot_c = Arc::clone(&snapshot);
270 write_new_snapshot(
271 snapshot_c,
272 self.compression_level,
273 self.storage.as_ref(),
274 &self.storage_settings,
275 &self.request_semaphore,
276 )
277 .await?;
278 let snapshot_id = snapshot.id().clone();
279 self.warn_if_snapshot_cache_small(snapshot.as_ref());
280 self.snapshot_cache.insert(snapshot_id, snapshot);
283 Ok(())
284 }
285
286 #[instrument(skip(self))]
287 pub async fn fetch_snapshot(
288 &self,
289 snapshot_id: &SnapshotId,
290 ) -> RepositoryResult<Arc<Snapshot>> {
291 match self.snapshot_cache.get_value_or_guard_async(snapshot_id).await {
292 Ok(snapshot) => Ok(snapshot),
293 Err(guard) => {
294 let snapshot = fetch_snapshot(
295 snapshot_id,
296 self.storage.as_ref(),
297 &self.storage_settings,
298 &self.request_semaphore,
299 )
300 .await?;
301 self.warn_if_snapshot_cache_small(snapshot.as_ref());
302 let _fail_is_ok = guard.insert(Arc::clone(&snapshot));
303 Ok(snapshot)
304 }
305 }
306 }
307
308 #[instrument(skip(self, log))]
309 pub async fn write_transaction_log(
310 &self,
311 transaction_id: SnapshotId,
312 log: Arc<TransactionLog>,
313 ) -> RepositoryResult<()> {
314 let log_c = Arc::clone(&log);
315 write_new_tx_log(
316 transaction_id.clone(),
317 log_c,
318 self.compression_level,
319 self.storage.as_ref(),
320 &self.storage_settings,
321 &self.request_semaphore,
322 )
323 .await?;
324 self.transactions_cache.insert(transaction_id, log);
325 Ok(())
326 }
327
328 #[instrument(skip(self))]
329 pub async fn fetch_transaction_log(
330 &self,
331 transaction_id: &SnapshotId,
332 ) -> RepositoryResult<Arc<TransactionLog>> {
333 match self.transactions_cache.get_value_or_guard_async(transaction_id).await {
334 Ok(transaction) => Ok(transaction),
335 Err(guard) => {
336 let transaction = fetch_transaction_log(
337 transaction_id,
338 self.storage.as_ref(),
339 &self.storage_settings,
340 &self.request_semaphore,
341 )
342 .await?;
343 let _fail_is_ok = guard.insert(Arc::clone(&transaction));
344 Ok(transaction)
345 }
346 }
347 }
348
349 #[instrument(skip(self, bytes))]
350 pub async fn write_chunk(
351 &self,
352 chunk_id: ChunkId,
353 bytes: Bytes,
354 ) -> RepositoryResult<()> {
355 trace!(%chunk_id, size_bytes=bytes.len(), "Writing chunk");
356 let _permit = self.request_semaphore.acquire().await?;
357 Ok(self.storage.write_chunk(&self.storage_settings, chunk_id, bytes).await?)
359 }
360
361 #[instrument(skip(self))]
362 pub async fn fetch_chunk(
363 &self,
364 chunk_id: &ChunkId,
365 range: &Range<ChunkOffset>,
366 ) -> RepositoryResult<Bytes> {
367 let key = (chunk_id.clone(), range.clone());
368 match self.chunk_cache.get_value_or_guard_async(&key).await {
369 Ok(chunk) => Ok(chunk),
370 Err(guard) => {
371 trace!(%chunk_id, ?range, "Downloading chunk");
372 let permit = self.request_semaphore.acquire().await?;
373 let chunk = self
374 .storage
375 .fetch_chunk(&self.storage_settings, chunk_id, range)
376 .await?;
377 drop(permit);
378 let _fail_is_ok = guard.insert(chunk.clone());
379 Ok(chunk)
380 }
381 }
382 }
383
384 #[instrument(skip(self))]
387 pub async fn snapshot_info_ancestry(
388 self: Arc<Self>,
389 snapshot_id: &SnapshotId,
390 ) -> RepositoryResult<impl Stream<Item = RepositoryResult<SnapshotInfo>> + use<>>
391 {
392 let res =
393 self.snapshot_ancestry(snapshot_id).await?.and_then(|snap| async move {
394 let info = snap.as_ref().try_into()?;
395 Ok(info)
396 });
397 Ok(res)
398 }
399
400 #[instrument(skip(self))]
403 pub async fn snapshot_ancestry(
404 self: Arc<Self>,
405 snapshot_id: &SnapshotId,
406 ) -> RepositoryResult<impl Stream<Item = RepositoryResult<Arc<Snapshot>>> + use<>>
407 {
408 let mut this = self.fetch_snapshot(snapshot_id).await?;
409 let stream = try_stream! {
410 yield Arc::clone(&this);
411 while let Some(parent) = this.parent_id() {
412 let snap = self.fetch_snapshot(&parent).await?;
413 yield Arc::clone(&snap);
414 this = snap;
415 }
416 };
417 Ok(stream)
418 }
419
420 #[instrument(skip(self))]
421 pub async fn get_snapshot_last_modified(
422 &self,
423 snapshot_id: &SnapshotId,
424 ) -> RepositoryResult<DateTime<Utc>> {
425 debug!(%snapshot_id, "Getting snapshot timestamp");
426 let _permit = self.request_semaphore.acquire().await?;
427 Ok(self
428 .storage
429 .get_snapshot_last_modified(&self.storage_settings, snapshot_id)
430 .await?)
431 }
432
433 #[instrument(skip(self))]
434 pub async fn fetch_snapshot_info(
435 &self,
436 snapshot_id: &SnapshotId,
437 ) -> RepositoryResult<SnapshotInfo> {
438 let snapshot = self.fetch_snapshot(snapshot_id).await?;
439 let info = snapshot.as_ref().try_into()?;
440 Ok(info)
441 }
442}
443
444fn binary_file_header(
445 spec_version: SpecVersionBin,
446 file_type: FileTypeBin,
447 compression_algorithm: CompressionAlgorithmBin,
448) -> Vec<u8> {
449 use format_constants::*;
450 let mut buffer = Vec::with_capacity(1024);
452 buffer.extend_from_slice(ICECHUNK_FORMAT_MAGIC_BYTES);
454 let implementation = format!("{:<24}", &*ICECHUNK_CLIENT_NAME);
456 buffer.extend_from_slice(&implementation.as_bytes()[..24]);
457 buffer.push(spec_version as u8);
459 buffer.push(file_type as u8);
460 buffer.push(compression_algorithm as u8);
462 buffer
463}
464
465fn check_header(
466 read: &mut (dyn Read + Unpin + Send),
467 file_type: FileTypeBin,
468) -> RepositoryResult<(SpecVersionBin, CompressionAlgorithmBin)> {
469 let mut buf = [0; 12];
470 read.read_exact(&mut buf)?;
471 if format_constants::ICECHUNK_FORMAT_MAGIC_BYTES != buf {
473 return Err(RepositoryErrorKind::FormatError(
474 IcechunkFormatErrorKind::InvalidMagicNumbers,
475 )
476 .into());
477 }
478
479 let mut buf = [0; 24];
480 read.read_exact(&mut buf)?;
482
483 let mut spec_version = 0;
484 read.read_exact(std::slice::from_mut(&mut spec_version))?;
485
486 let spec_version = spec_version.try_into().map_err(|_| {
487 RepositoryErrorKind::FormatError(IcechunkFormatErrorKind::InvalidSpecVersion)
488 })?;
489
490 let mut actual_file_type_int = 0;
491 read.read_exact(std::slice::from_mut(&mut actual_file_type_int))?;
492
493 let actual_file_type: FileTypeBin =
494 actual_file_type_int.try_into().map_err(|_| {
495 RepositoryErrorKind::FormatError(IcechunkFormatErrorKind::InvalidFileType {
496 expected: file_type,
497 got: actual_file_type_int,
498 })
499 })?;
500
501 if actual_file_type != file_type {
502 return Err(RepositoryErrorKind::FormatError(
503 IcechunkFormatErrorKind::InvalidFileType {
504 expected: file_type,
505 got: actual_file_type_int,
506 },
507 )
508 .into());
509 }
510
511 let mut compression = 0;
512 read.read_exact(std::slice::from_mut(&mut compression))?;
513
514 let compression = compression.try_into().map_err(|_| {
515 RepositoryErrorKind::FormatError(
516 IcechunkFormatErrorKind::InvalidCompressionAlgorithm,
517 )
518 })?;
519
520 Ok((spec_version, compression))
521}
522
523async fn write_new_manifest(
524 new_manifest: Arc<Manifest>,
525 compression_level: u8,
526 storage: &(dyn Storage + Send + Sync),
527 storage_settings: &storage::Settings,
528 semaphore: &Semaphore,
529) -> RepositoryResult<u64> {
530 use format_constants::*;
531 let metadata = vec![
532 (
533 LATEST_ICECHUNK_FORMAT_VERSION_METADATA_KEY.to_string(),
534 (SpecVersionBin::current() as u8).to_string(),
535 ),
536 (ICECHUNK_CLIENT_NAME_METADATA_KEY.to_string(), ICECHUNK_CLIENT_NAME.to_string()),
537 (
538 ICECHUNK_FILE_TYPE_METADATA_KEY.to_string(),
539 ICECHUNK_FILE_TYPE_MANIFEST.to_string(),
540 ),
541 (
542 ICECHUNK_COMPRESSION_METADATA_KEY.to_string(),
543 ICECHUNK_COMPRESSION_ZSTD.to_string(),
544 ),
545 ];
546
547 let id = new_manifest.id().clone();
548
549 let span = Span::current();
550 let buffer = tokio::task::spawn_blocking(move || {
553 let _entered = span.entered();
554 let buffer = binary_file_header(
555 SpecVersionBin::current(),
556 FileTypeBin::Manifest,
557 CompressionAlgorithmBin::Zstd,
558 );
559 let mut compressor =
560 zstd::stream::Encoder::new(buffer, compression_level as i32)?;
561
562 serialize_manifest(
563 new_manifest.as_ref(),
564 SpecVersionBin::current(),
565 &mut compressor,
566 )?;
567
568 compressor.finish().map_err(RepositoryErrorKind::IOError)
569 })
570 .await??;
571
572 let len = buffer.len() as u64;
573 debug!(%id, size_bytes=len, "Writing manifest");
574 let _permit = semaphore.acquire().await?;
575 storage.write_manifest(storage_settings, id.clone(), metadata, buffer.into()).await?;
576 Ok(len)
577}
578
579async fn fetch_manifest(
580 manifest_id: &ManifestId,
581 manifest_size: u64,
582 storage: &(dyn Storage + Send + Sync),
583 storage_settings: &storage::Settings,
584 semaphore: &Semaphore,
585) -> RepositoryResult<Arc<Manifest>> {
586 debug!(%manifest_id, "Downloading manifest");
587
588 let _permit = semaphore.acquire().await?;
589 let reader = if manifest_size > 0 {
590 storage
591 .fetch_manifest_known_size(storage_settings, manifest_id, manifest_size)
592 .await?
593 } else {
594 Reader::Asynchronous(
595 storage.fetch_manifest_unknown_size(storage_settings, manifest_id).await?,
596 )
597 };
598
599 let span = Span::current();
600 tokio::task::spawn_blocking(move || {
601 let _entered = span.entered();
602 let (spec_version, decompressor) =
603 check_and_get_decompressor(reader, FileTypeBin::Manifest)?;
604 deserialize_manifest(spec_version, decompressor).map_err(RepositoryError::from)
605 })
606 .await?
607 .map(Arc::new)
608}
609
610fn check_and_get_decompressor(
611 data: Reader,
612 file_type: FileTypeBin,
613) -> RepositoryResult<(SpecVersionBin, Box<dyn Read + Send>)> {
614 let mut sync_read = data.into_read();
615 let (spec_version, compression) = check_header(sync_read.as_mut(), file_type)?;
616 debug_assert_eq!(compression, CompressionAlgorithmBin::Zstd);
617 let decompressor =
619 BufReader::with_capacity(1_024, zstd::stream::Decoder::new(sync_read)?);
620 Ok((spec_version, Box::new(decompressor)))
621}
622
623async fn write_new_snapshot(
624 new_snapshot: Arc<Snapshot>,
625 compression_level: u8,
626 storage: &(dyn Storage + Send + Sync),
627 storage_settings: &storage::Settings,
628 semaphore: &Semaphore,
629) -> RepositoryResult<SnapshotId> {
630 use format_constants::*;
631 let metadata = vec![
632 (
633 LATEST_ICECHUNK_FORMAT_VERSION_METADATA_KEY.to_string(),
634 (SpecVersionBin::current() as u8).to_string(),
635 ),
636 (ICECHUNK_CLIENT_NAME_METADATA_KEY.to_string(), ICECHUNK_CLIENT_NAME.to_string()),
637 (
638 ICECHUNK_FILE_TYPE_METADATA_KEY.to_string(),
639 ICECHUNK_FILE_TYPE_SNAPSHOT.to_string(),
640 ),
641 (
642 ICECHUNK_COMPRESSION_METADATA_KEY.to_string(),
643 ICECHUNK_COMPRESSION_ZSTD.to_string(),
644 ),
645 ];
646
647 let id = new_snapshot.id().clone();
648 let span = Span::current();
649 let buffer = tokio::task::spawn_blocking(move || {
650 let _entered = span.entered();
651 let buffer = binary_file_header(
652 SpecVersionBin::current(),
653 FileTypeBin::Snapshot,
654 CompressionAlgorithmBin::Zstd,
655 );
656 let mut compressor =
657 zstd::stream::Encoder::new(buffer, compression_level as i32)?;
658
659 serialize_snapshot(
660 new_snapshot.as_ref(),
661 SpecVersionBin::current(),
662 &mut compressor,
663 )?;
664
665 compressor.finish().map_err(RepositoryErrorKind::IOError)
666 })
667 .await??;
668
669 debug!(%id, size_bytes=buffer.len(), "Writing snapshot");
670 let _permit = semaphore.acquire().await?;
671 storage.write_snapshot(storage_settings, id.clone(), metadata, buffer.into()).await?;
672
673 Ok(id)
674}
675
676async fn fetch_snapshot(
677 snapshot_id: &SnapshotId,
678 storage: &(dyn Storage + Send + Sync),
679 storage_settings: &storage::Settings,
680 semaphore: &Semaphore,
681) -> RepositoryResult<Arc<Snapshot>> {
682 debug!(%snapshot_id, "Downloading snapshot");
683 let _permit = semaphore.acquire().await?;
684 let read = storage.fetch_snapshot(storage_settings, snapshot_id).await?;
685
686 let span = Span::current();
687 tokio::task::spawn_blocking(move || {
688 let _entered = span.entered();
689 let (spec_version, decompressor) = check_and_get_decompressor(
690 Reader::Asynchronous(read),
691 FileTypeBin::Snapshot,
692 )?;
693 deserialize_snapshot(spec_version, decompressor).map_err(RepositoryError::from)
694 })
695 .await?
696 .map(Arc::new)
697}
698
699async fn write_new_tx_log(
700 transaction_id: SnapshotId,
701 new_log: Arc<TransactionLog>,
702 compression_level: u8,
703 storage: &(dyn Storage + Send + Sync),
704 storage_settings: &storage::Settings,
705 semaphore: &Semaphore,
706) -> RepositoryResult<()> {
707 use format_constants::*;
708 let metadata = vec![
709 (
710 LATEST_ICECHUNK_FORMAT_VERSION_METADATA_KEY.to_string(),
711 (SpecVersionBin::current() as u8).to_string(),
712 ),
713 (ICECHUNK_CLIENT_NAME_METADATA_KEY.to_string(), ICECHUNK_CLIENT_NAME.to_string()),
714 (
715 ICECHUNK_FILE_TYPE_METADATA_KEY.to_string(),
716 ICECHUNK_FILE_TYPE_TRANSACTION_LOG.to_string(),
717 ),
718 (
719 ICECHUNK_COMPRESSION_METADATA_KEY.to_string(),
720 ICECHUNK_COMPRESSION_ZSTD.to_string(),
721 ),
722 ];
723
724 let span = Span::current();
725 let buffer = tokio::task::spawn_blocking(move || {
726 let _entered = span.entered();
727 let buffer = binary_file_header(
728 SpecVersionBin::current(),
729 FileTypeBin::TransactionLog,
730 CompressionAlgorithmBin::Zstd,
731 );
732 let mut compressor =
733 zstd::stream::Encoder::new(buffer, compression_level as i32)?;
734 serialize_transaction_log(
735 new_log.as_ref(),
736 SpecVersionBin::current(),
737 &mut compressor,
738 )?;
739 compressor.finish().map_err(RepositoryErrorKind::IOError)
740 })
741 .await??;
742
743 debug!(%transaction_id, size_bytes=buffer.len(), "Writing transaction log");
744 let _permit = semaphore.acquire().await?;
745 storage
746 .write_transaction_log(storage_settings, transaction_id, metadata, buffer.into())
747 .await?;
748
749 Ok(())
750}
751
752async fn fetch_transaction_log(
753 transaction_id: &SnapshotId,
754 storage: &(dyn Storage + Send + Sync),
755 storage_settings: &storage::Settings,
756 semaphore: &Semaphore,
757) -> RepositoryResult<Arc<TransactionLog>> {
758 debug!(%transaction_id, "Downloading transaction log");
759 let _permit = semaphore.acquire().await?;
760 let read = storage.fetch_transaction_log(storage_settings, transaction_id).await?;
761
762 let span = Span::current();
763 tokio::task::spawn_blocking(move || {
764 let _entered = span.entered();
765 let (spec_version, decompressor) = check_and_get_decompressor(
766 Reader::Asynchronous(read),
767 FileTypeBin::TransactionLog,
768 )?;
769 deserialize_transaction_log(spec_version, decompressor)
770 .map_err(RepositoryError::from)
771 })
772 .await?
773 .map(Arc::new)
774}
775
776#[derive(Debug, Clone)]
777struct FileWeighter;
778
779impl Weighter<ManifestId, Arc<Manifest>> for FileWeighter {
780 fn weight(&self, _: &ManifestId, val: &Arc<Manifest>) -> u64 {
781 val.len() as u64
782 }
783}
784
785impl Weighter<SnapshotId, Arc<Snapshot>> for FileWeighter {
786 fn weight(&self, _: &SnapshotId, val: &Arc<Snapshot>) -> u64 {
787 val.len() as u64
788 }
789}
790
791impl Weighter<(ChunkId, Range<ChunkOffset>), Bytes> for FileWeighter {
792 fn weight(&self, _: &(ChunkId, Range<ChunkOffset>), val: &Bytes) -> u64 {
793 val.len() as u64
794 }
795}
796
797impl Weighter<SnapshotId, Arc<TransactionLog>> for FileWeighter {
798 fn weight(&self, _: &SnapshotId, val: &Arc<TransactionLog>) -> u64 {
799 val.len() as u64
800 }
801}
802
803#[cfg(test)]
804#[allow(clippy::panic, clippy::unwrap_used, clippy::expect_used)]
805mod test {
806
807 use icechunk_macros::tokio_test;
808 use itertools::{Itertools, assert_equal};
809
810 use super::*;
811 use crate::{
812 format::{
813 ChunkIndices, NodeId,
814 manifest::{ChunkInfo, ChunkPayload},
815 },
816 storage::{Storage, logging::LoggingStorage, new_in_memory_storage},
817 };
818
819 #[tokio_test]
820 async fn test_caching_caches() -> Result<(), Box<dyn std::error::Error>> {
821 let backend: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
822 let settings = storage::Settings::default();
823 let manager =
824 AssetManager::new_no_cache(backend.clone(), settings.clone(), 1, 100);
825
826 let node1 = NodeId::random();
827 let node2 = NodeId::random();
828 let ci1 = ChunkInfo {
829 node: node1.clone(),
830 coord: ChunkIndices(vec![0]),
831 payload: ChunkPayload::Inline(Bytes::copy_from_slice(b"a")),
832 };
833 let ci2 = ChunkInfo {
834 node: node2.clone(),
835 coord: ChunkIndices(vec![1]),
836 payload: ChunkPayload::Inline(Bytes::copy_from_slice(b"b")),
837 };
838 let pre_existing_manifest =
839 Manifest::from_iter(vec![ci1].into_iter()).await?.unwrap();
840 let pre_existing_manifest = Arc::new(pre_existing_manifest);
841 let pre_existing_id = pre_existing_manifest.id();
842 let pre_size = manager.write_manifest(Arc::clone(&pre_existing_manifest)).await?;
843
844 let logging = Arc::new(LoggingStorage::new(Arc::clone(&backend)));
845 let logging_c: Arc<dyn Storage + Send + Sync> = logging.clone();
846 let caching = AssetManager::new_with_config(
847 Arc::clone(&logging_c),
848 settings,
849 &CachingConfig::default(),
850 1,
851 100,
852 );
853
854 let manifest =
855 Arc::new(Manifest::from_iter(vec![ci2.clone()].into_iter()).await?.unwrap());
856 let id = manifest.id();
857 let size = caching.write_manifest(Arc::clone(&manifest)).await?;
858
859 let fetched = caching.fetch_manifest(&id, size).await?;
860 assert_eq!(fetched.len(), 1);
861 assert_equal(
862 fetched.iter(node2.clone()).map(|x| x.unwrap()),
863 [(ci2.coord.clone(), ci2.payload.clone())],
864 );
865
866 caching.fetch_manifest(&id, size).await?;
868 assert_eq!(logging.fetch_operations(), vec![]);
870
871 caching.fetch_manifest(&pre_existing_id, pre_size).await?;
873 assert_eq!(
874 logging.fetch_operations(),
875 vec![("fetch_manifest_splitting".to_string(), pre_existing_id.to_string())]
876 );
877
878 caching.fetch_manifest(&pre_existing_id, pre_size).await?;
880 assert_eq!(
881 logging.fetch_operations(),
882 vec![("fetch_manifest_splitting".to_string(), pre_existing_id.to_string())]
883 );
884
885 caching.fetch_manifest(&id, size).await?;
887 assert_eq!(
888 logging.fetch_operations(),
889 vec![("fetch_manifest_splitting".to_string(), pre_existing_id.to_string())]
890 );
891 Ok(())
892 }
893
894 #[tokio_test]
895 async fn test_caching_storage_has_limit() -> Result<(), Box<dyn std::error::Error>> {
896 let backend: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
897 let settings = storage::Settings::default();
898 let manager =
899 AssetManager::new_no_cache(backend.clone(), settings.clone(), 1, 100);
900
901 let ci1 = ChunkInfo {
902 node: NodeId::random(),
903 coord: ChunkIndices(vec![]),
904 payload: ChunkPayload::Inline(Bytes::copy_from_slice(b"a")),
905 };
906 let ci2 = ChunkInfo { node: NodeId::random(), ..ci1.clone() };
907 let ci3 = ChunkInfo { node: NodeId::random(), ..ci1.clone() };
908 let ci4 = ChunkInfo { node: NodeId::random(), ..ci1.clone() };
909 let ci5 = ChunkInfo { node: NodeId::random(), ..ci1.clone() };
910 let ci6 = ChunkInfo { node: NodeId::random(), ..ci1.clone() };
911 let ci7 = ChunkInfo { node: NodeId::random(), ..ci1.clone() };
912 let ci8 = ChunkInfo { node: NodeId::random(), ..ci1.clone() };
913 let ci9 = ChunkInfo { node: NodeId::random(), ..ci1.clone() };
914
915 let manifest1 =
916 Arc::new(Manifest::from_iter(vec![ci1, ci2, ci3]).await?.unwrap());
917 let id1 = manifest1.id();
918 let size1 = manager.write_manifest(Arc::clone(&manifest1)).await?;
919 let manifest2 =
920 Arc::new(Manifest::from_iter(vec![ci4, ci5, ci6]).await?.unwrap());
921 let id2 = manifest2.id();
922 let size2 = manager.write_manifest(Arc::clone(&manifest2)).await?;
923 let manifest3 =
924 Arc::new(Manifest::from_iter(vec![ci7, ci8, ci9]).await?.unwrap());
925 let id3 = manifest3.id();
926 let size3 = manager.write_manifest(Arc::clone(&manifest3)).await?;
927
928 let logging = Arc::new(LoggingStorage::new(Arc::clone(&backend)));
929 let logging_c: Arc<dyn Storage + Send + Sync> = logging.clone();
930 let caching = AssetManager::new_with_config(
931 logging_c,
932 settings,
933 &CachingConfig {
935 num_snapshot_nodes: Some(0),
936 num_chunk_refs: Some(7),
937 num_transaction_changes: Some(0),
938 num_bytes_attributes: Some(0),
939 num_bytes_chunks: Some(0),
940 },
941 1,
942 100,
943 );
944
945 for _ in 0..20 {
947 caching.fetch_manifest(&id1, size1).await?;
948 caching.fetch_manifest(&id2, size2).await?;
949 caching.fetch_manifest(&id3, size3).await?;
950 }
951 assert_eq!(logging.fetch_operations()[10..].iter().unique().count(), 1);
953
954 Ok(())
955 }
956
957 #[tokio_test]
958 async fn test_dont_fetch_asset_twice() -> Result<(), Box<dyn std::error::Error>> {
959 let storage: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
962 let settings = storage::Settings::default();
963 let manager = Arc::new(AssetManager::new_no_cache(
964 storage.clone(),
965 settings.clone(),
966 1,
967 100,
968 ));
969
970 let manifest = Manifest::from_iter((0..5_000).map(|_| ChunkInfo {
972 node: NodeId::random(),
973 coord: ChunkIndices(Vec::from([rand::random(), rand::random()])),
974 payload: ChunkPayload::Inline("hello".into()),
975 }))
976 .await
977 .unwrap()
978 .unwrap();
979 let manifest_id = manifest.id().clone();
980 let size = manager.write_manifest(Arc::new(manifest)).await?;
981
982 let logging = Arc::new(LoggingStorage::new(Arc::clone(&storage)));
983 let logging_c: Arc<dyn Storage + Send + Sync> = logging.clone();
984 let manager = Arc::new(AssetManager::new_with_config(
985 logging_c.clone(),
986 logging_c.default_settings(),
987 &CachingConfig::default(),
988 1,
989 100,
990 ));
991
992 let manager_c = Arc::new(manager);
993 let manager_cc = Arc::clone(&manager_c);
994 let manifest_id_c = manifest_id.clone();
995
996 let res1 = tokio::task::spawn(async move {
997 manager_c.fetch_manifest(&manifest_id_c, size).await
998 });
999 let res2 = tokio::task::spawn(async move {
1000 manager_cc.fetch_manifest(&manifest_id, size).await
1001 });
1002
1003 assert!(res1.await?.is_ok());
1004 assert!(res2.await?.is_ok());
1005 assert_eq!(logging.fetch_operations().len(), 1);
1006 Ok(())
1007 }
1008}