common/storage/loader.rs
1use std::sync::Arc;
2
3use async_trait::async_trait;
4use tokio::sync::Semaphore;
5use uuid::Uuid;
6
7use crate::{StorageError, StorageRead, StorageResult};
8
9/// Trait for types that can be loaded from storage.
10///
11/// This trait abstracts the loading logic for any type that can be loaded from
12/// storage.
13#[async_trait]
14pub trait Loadable<S>: Sized + Send + Sync {
15 /// Load an instance from storage for the given scope.
16 async fn load(storage: &dyn StorageRead, scope: &S) -> StorageResult<Self>;
17}
18
19/// Synchronization metadata for coordinating with the ingest side.
20pub struct LoadMetadata {
21 pub load_id: Uuid,
22 pub sequence_number: u64,
23}
24
25/// Result of loading a bucket from storage.
26///
27/// Contains the loaded data along with synchronization metadata
28/// to coordinate with the ingest side.
29pub struct LoadResult<T> {
30 /// The loaded data
31 pub data: T,
32 /// Synchronization metadata from the ingest side
33 pub metadata: LoadMetadata,
34}
35
36/// Specification for loading a bucket from storage.
37///
38/// Contains all the information needed to load a bucket, including
39/// the storage snapshot to load from and optional synchronization metadata.
40///
41/// The generic parameter `T` specifies which type to load (e.g., IngestTsdb or QueryTsdb).
42pub struct LoadSpec<T: Loadable<S>, S> {
43 /// The storage snapshot to load from (could be current storage or a point-in-time snapshot)
44 pub storage: Arc<dyn StorageRead>,
45 /// Optional synchronization metadata from the ingest side
46 /// If present, the loader will return these values in the LoadResult
47 pub metadata: Option<LoadMetadata>,
48 /// Phantom data to carry the type information
49 _phantom: std::marker::PhantomData<(T, S)>,
50}
51
52/// Generic loader that manages concurrent loading of Loadable types.
53///
54/// Uses a semaphore to limit the number of concurrent load operations,
55/// preventing resource exhaustion while still allowing some parallelism.
56///
57/// This loader is not generic itself, but its `load()` method is generic,
58/// allowing a single Loader instance to load different types and scopes.
59pub struct Loader {
60 /// Semaphore to limit concurrent load operations
61 semaphore: Arc<Semaphore>,
62}
63
64impl Loader {
65 /// Creates a new Loader with a specified maximum number of concurrent loads.
66 ///
67 /// # Arguments
68 ///
69 /// * `max_concurrent_loads` - Maximum number of load operations that can run concurrently
70 pub fn new(max_concurrent_loads: usize) -> Self {
71 Self {
72 semaphore: Arc::new(Semaphore::new(max_concurrent_loads)),
73 }
74 }
75
76 /// Load a bucket from storage.
77 ///
78 /// Acquires a semaphore permit to limit concurrent loads, then spawns
79 /// an async task to perform the actual loading operation.
80 ///
81 /// If the LoadSpec does not include metadata, a new load_id (UUID) will be
82 /// generated and sequence_number will be set to 0.
83 ///
84 /// Returns a LoadResult containing the loaded data and synchronization metadata.
85 pub async fn load<T: Loadable<S>, S>(
86 &self,
87 spec: LoadSpec<T, S>,
88 scope: &S,
89 ) -> StorageResult<LoadResult<T>> {
90 // Acquire permit (blocks if at capacity)
91 let _permit = self
92 .semaphore
93 .acquire()
94 .await
95 .map_err(|_| StorageError::from_storage("Semaphore closed"))?;
96
97 // Generate metadata if not provided
98 let metadata = spec.metadata.unwrap_or_else(|| LoadMetadata {
99 load_id: Uuid::new_v4(),
100 sequence_number: 0,
101 });
102
103 // Load the data
104 let data = T::load(&*spec.storage, scope).await?;
105
106 Ok(LoadResult { data, metadata })
107 }
108}