Skip to main content

common/storage/
loader.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use tokio::sync::Semaphore;
5use uuid::Uuid;
6
7use crate::{StorageError, StorageRead, StorageResult};
8
9/// Trait for types that can be loaded from storage.
10///
11/// This trait abstracts the loading logic for any type that can be loaded from
12/// storage.
13#[async_trait]
14pub trait Loadable<S>: Sized + Send + Sync {
15    /// Load an instance from storage for the given scope.
16    async fn load(storage: &dyn StorageRead, scope: &S) -> StorageResult<Self>;
17}
18
19/// Synchronization metadata for coordinating with the ingest side.
20pub struct LoadMetadata {
21    pub load_id: Uuid,
22    pub sequence_number: u64,
23}
24
25/// Result of loading a bucket from storage.
26///
27/// Contains the loaded data along with synchronization metadata
28/// to coordinate with the ingest side.
29pub struct LoadResult<T> {
30    /// The loaded data
31    pub data: T,
32    /// Synchronization metadata from the ingest side
33    pub metadata: LoadMetadata,
34}
35
36/// Specification for loading a bucket from storage.
37///
38/// Contains all the information needed to load a bucket, including
39/// the storage snapshot to load from and optional synchronization metadata.
40///
41/// The generic parameter `T` specifies which type to load (e.g., IngestTsdb or QueryTsdb).
42pub struct LoadSpec<T: Loadable<S>, S> {
43    /// The storage snapshot to load from (could be current storage or a point-in-time snapshot)
44    pub storage: Arc<dyn StorageRead>,
45    /// Optional synchronization metadata from the ingest side
46    /// If present, the loader will return these values in the LoadResult
47    pub metadata: Option<LoadMetadata>,
48    /// Phantom data to carry the type information
49    _phantom: std::marker::PhantomData<(T, S)>,
50}
51
52/// Generic loader that manages concurrent loading of Loadable types.
53///
54/// Uses a semaphore to limit the number of concurrent load operations,
55/// preventing resource exhaustion while still allowing some parallelism.
56///
57/// This loader is not generic itself, but its `load()` method is generic,
58/// allowing a single Loader instance to load different types and scopes.
59pub struct Loader {
60    /// Semaphore to limit concurrent load operations
61    semaphore: Arc<Semaphore>,
62}
63
64impl Loader {
65    /// Creates a new Loader with a specified maximum number of concurrent loads.
66    ///
67    /// # Arguments
68    ///
69    /// * `max_concurrent_loads` - Maximum number of load operations that can run concurrently
70    pub fn new(max_concurrent_loads: usize) -> Self {
71        Self {
72            semaphore: Arc::new(Semaphore::new(max_concurrent_loads)),
73        }
74    }
75
76    /// Load a bucket from storage.
77    ///
78    /// Acquires a semaphore permit to limit concurrent loads, then spawns
79    /// an async task to perform the actual loading operation.
80    ///
81    /// If the LoadSpec does not include metadata, a new load_id (UUID) will be
82    /// generated and sequence_number will be set to 0.
83    ///
84    /// Returns a LoadResult containing the loaded data and synchronization metadata.
85    pub async fn load<T: Loadable<S>, S>(
86        &self,
87        spec: LoadSpec<T, S>,
88        scope: &S,
89    ) -> StorageResult<LoadResult<T>> {
90        // Acquire permit (blocks if at capacity)
91        let _permit = self
92            .semaphore
93            .acquire()
94            .await
95            .map_err(|_| StorageError::from_storage("Semaphore closed"))?;
96
97        // Generate metadata if not provided
98        let metadata = spec.metadata.unwrap_or_else(|| LoadMetadata {
99            load_id: Uuid::new_v4(),
100            sequence_number: 0,
101        });
102
103        // Load the data
104        let data = T::load(&*spec.storage, scope).await?;
105
106        Ok(LoadResult { data, metadata })
107    }
108}