libsql_wal/storage/
async_storage.rs

1//! `AsyncStorage` is a `Storage` implementation that defer storage to a background thread. The
2//! durable frame_no is notified asynchronously.
3
4use std::sync::Arc;
5
6use chrono::Utc;
7use libsql_sys::name::NamespaceName;
8use tokio::sync::{mpsc, oneshot};
9use tokio::task::JoinSet;
10use tokio_stream::Stream;
11
12use crate::io::{FileExt, Io, StdIO};
13use crate::segment::compacted::CompactedSegment;
14use crate::segment::Segment;
15
16use super::backend::{Backend, FindSegmentReq};
17use super::scheduler::Scheduler;
18use super::{OnStoreCallback, RestoreOptions, Storage, StoreSegmentRequest};
19
20/// Background loop task state.
21///
22/// The background loop task is not allowed to exit, unless it was notified for shutdown.
23///
24/// On shutdown, attempts to empty the queue, and flush the receiver. When the last handle of the
25/// receiver is dropped, and the queue is empty, exit.
26pub struct AsyncStorageLoop<B: Backend, IO: Io, S> {
27    receiver: mpsc::UnboundedReceiver<StorageLoopMessage<S, B::Config>>,
28    scheduler: Scheduler<S, B::Config>,
29    backend: Arc<B>,
30    io: Arc<IO>,
31    max_in_flight: usize,
32    force_shutdown: oneshot::Receiver<()>,
33}
34
35impl<B, FS, S> AsyncStorageLoop<B, FS, S>
36where
37    FS: Io,
38    B: Backend + 'static,
39    S: Segment,
40{
41    /// Schedules durability jobs. This loop is not allowed to fail, or lose jobs.
42    /// A job is prepared by calling `Scheduler::prepare(..)`. The job is spawned, and it returns a
43    /// `JobResult`, which is then returned to the scheduler by calling `Scheduler::report(..)`.
44    /// When a request is received, it is immediately scheduled by calling `Scheduler::register`
45    /// with it.
46    ///
47    /// The loop is only allowed to shutdown if the receiver is closed, and the scheduler is empty,
48    /// or if `force_shutdown` is called, in which case everything is dropped in place.
49    #[tracing::instrument(skip(self))]
50    pub async fn run(mut self) {
51        let mut shutting_down = false;
52        let mut in_flight_futs = JoinSet::new();
53        let mut notify_shutdown = None;
54        // run the loop until shutdown.
55        loop {
56            if shutting_down && self.scheduler.is_empty() {
57                break;
58            }
59
60            // schedule as much work as possible
61            while self.scheduler.has_work() && in_flight_futs.len() < self.max_in_flight {
62                let job = self
63                    .scheduler
64                    .schedule()
65                    .expect("scheduler has work, but didn't return a job");
66                in_flight_futs.spawn(job.perform(self.backend.clone(), self.io.clone()));
67            }
68
69            tokio::select! {
70                biased;
71                Some(join_result) = in_flight_futs.join_next(), if !in_flight_futs.is_empty() => {
72                    match join_result {
73                        Ok(job_result) => {
74                            // if shutting down, log progess:
75                            if shutting_down {
76                                tracing::info!("processed job, {} jobs remaining", in_flight_futs.len());
77                            }
78                            self.scheduler.report(job_result).await;
79                        }
80                        Err(e) => {
81                            // job panicked. report and exit process. The program is crippled, from
82                            // now on, so we just exit, and hope to restart on a fresh state.
83                            tracing::error!("fatal error: bottomless job panicked: {e}");
84                            std::process::exit(1);
85                        }
86                    }
87                }
88                msg = self.receiver.recv(), if !shutting_down => {
89                    match msg {
90                        Some(StorageLoopMessage::StoreReq(req)) => {
91                            self.scheduler.register(req);
92                        }
93                        Some(StorageLoopMessage::DurableFrameNoReq { namespace, ret, config_override }) => {
94                            self.fetch_durable_frame_no_async(namespace, ret, config_override);
95                        }
96                        Some(StorageLoopMessage::Shutdown(ret)) => {
97                            notify_shutdown.replace(ret);
98                            shutting_down = true;
99                            tracing::info!("Storage shutting down");
100                        }
101                        None => {
102                            shutting_down = true;
103                        }
104                    }
105                }
106                shutdown = &mut self.force_shutdown => {
107                    if shutdown.is_ok() {
108                        break
109                    } else {
110                        // force_shutdown sender was dropped without sending a message (likely a
111                        // bug). Log and default to graceful shutdown.
112                        // tracing::error!("bottomless force shutdown handle dropped without notifying; shutting down gracefully");
113                    }
114                }
115            }
116        }
117
118        tracing::info!("Storage shutdown");
119        if let Some(notify) = notify_shutdown {
120            let _ = notify.send(());
121        }
122    }
123
124    fn fetch_durable_frame_no_async(
125        &self,
126        namespace: NamespaceName,
127        ret: oneshot::Sender<super::Result<u64>>,
128        config_override: Option<B::Config>,
129    ) {
130        let backend = self.backend.clone();
131        let config = match config_override {
132            Some(config) => config,
133            None => backend.default_config(),
134        };
135
136        tokio::spawn(async move {
137            let res = backend
138                .meta(&config, &namespace)
139                .await
140                .map(|meta| meta.max_frame_no);
141            let _ = ret.send(res);
142        });
143    }
144}
145
146pub struct BottomlessConfig<C> {
147    /// The maximum number of store jobs that can be processed conccurently
148    pub max_jobs_conccurency: usize,
149    /// The maximum number of jobs that can be enqueued before throttling
150    pub max_enqueued_jobs: usize,
151    pub config: C,
152}
153
154enum StorageLoopMessage<S, C> {
155    StoreReq(StoreSegmentRequest<S, C>),
156    DurableFrameNoReq {
157        namespace: NamespaceName,
158        config_override: Option<C>,
159        ret: oneshot::Sender<super::Result<u64>>,
160    },
161    Shutdown(oneshot::Sender<()>),
162}
163
164pub struct AsyncStorage<B: Backend, S> {
165    /// send request to the main loop
166    job_sender: mpsc::UnboundedSender<StorageLoopMessage<S, B::Config>>,
167    force_shutdown: oneshot::Sender<()>,
168    backend: Arc<B>,
169}
170
171impl<B, S> Storage for AsyncStorage<B, S>
172where
173    B: Backend,
174    S: Segment,
175{
176    type Segment = S;
177    type Config = B::Config;
178
179    async fn shutdown(&self) {
180        let (snd, rcv) = oneshot::channel();
181        let _ = self.job_sender.send(StorageLoopMessage::Shutdown(snd));
182        let _ = rcv.await;
183    }
184
185    fn store(
186        &self,
187        namespace: &NamespaceName,
188        segment: Self::Segment,
189        config_override: Option<Self::Config>,
190        on_store_callback: OnStoreCallback,
191    ) {
192        let req = StoreSegmentRequest {
193            namespace: namespace.clone(),
194            segment,
195            created_at: Utc::now(),
196            storage_config_override: config_override,
197            on_store_callback,
198        };
199
200        self.job_sender
201            .send(StorageLoopMessage::StoreReq(req))
202            .expect("bottomless loop was closed before the handle was dropped");
203    }
204
205    async fn durable_frame_no(
206        &self,
207        namespace: &NamespaceName,
208        config_override: Option<Self::Config>,
209    ) -> super::Result<u64> {
210        let config = config_override.unwrap_or_else(|| self.backend.default_config());
211        let meta = self.backend.meta(&config, namespace).await?;
212        Ok(meta.max_frame_no)
213    }
214
215    async fn restore(
216        &self,
217        file: impl crate::io::FileExt,
218        namespace: &NamespaceName,
219        restore_options: RestoreOptions,
220        config_override: Option<Self::Config>,
221    ) -> super::Result<()> {
222        let config = config_override.unwrap_or_else(|| self.backend.default_config());
223        self.backend
224            .restore(&config, &namespace, restore_options, file)
225            .await
226    }
227
228    async fn find_segment(
229        &self,
230        namespace: &NamespaceName,
231        req: FindSegmentReq,
232        config_override: Option<Self::Config>,
233    ) -> super::Result<super::SegmentKey> {
234        let config = config_override.unwrap_or_else(|| self.backend.default_config());
235        let key = self.backend.find_segment(&config, namespace, req).await?;
236        Ok(key)
237    }
238
239    async fn fetch_segment_index(
240        &self,
241        namespace: &NamespaceName,
242        key: &super::SegmentKey,
243        config_override: Option<Self::Config>,
244    ) -> super::Result<fst::Map<Arc<[u8]>>> {
245        let config = config_override.unwrap_or_else(|| self.backend.default_config());
246        let index = self
247            .backend
248            .fetch_segment_index(&config, namespace, key)
249            .await?;
250        Ok(index)
251    }
252
253    async fn fetch_segment_data(
254        &self,
255        namespace: &NamespaceName,
256        key: &super::SegmentKey,
257        config_override: Option<Self::Config>,
258    ) -> super::Result<CompactedSegment<impl FileExt>> {
259        // TODO: make async
260        let config = config_override.unwrap_or_else(|| self.backend.default_config());
261        let backend = self.backend.clone();
262        let file = backend
263            .fetch_segment_data(config, namespace.clone(), *key)
264            .await?;
265        let segment = CompactedSegment::open(file).await?;
266        Ok(segment)
267    }
268
269    fn list_segments<'a>(
270        &'a self,
271        namespace: &'a NamespaceName,
272        until: u64,
273        config_override: Option<Self::Config>,
274    ) -> impl Stream<Item = super::Result<super::SegmentInfo>> + 'a {
275        let config = config_override.unwrap_or_else(|| self.backend.default_config());
276        self.backend.list_segments(config, namespace, until)
277    }
278}
279
280pub struct AsyncStorageInitConfig<B> {
281    pub backend: Arc<B>,
282    pub max_in_flight_jobs: usize,
283}
284
285impl<B: Backend, S> AsyncStorage<B, S> {
286    pub async fn new(
287        config: AsyncStorageInitConfig<B>,
288    ) -> (AsyncStorage<B, S>, AsyncStorageLoop<B, StdIO, S>)
289    where
290        B: Backend,
291        S: Segment,
292    {
293        Self::new_with_io(config, Arc::new(StdIO(()))).await
294    }
295
296    pub async fn new_with_io<IO>(
297        config: AsyncStorageInitConfig<B>,
298        io: Arc<IO>,
299    ) -> (AsyncStorage<B, S>, AsyncStorageLoop<B, IO, S>)
300    where
301        B: Backend,
302        IO: Io,
303        S: Segment,
304    {
305        let (job_snd, job_rcv) = tokio::sync::mpsc::unbounded_channel();
306        let (shutdown_snd, shutdown_rcv) = tokio::sync::oneshot::channel();
307        let scheduler = Scheduler::new();
308        let storage_loop = AsyncStorageLoop {
309            receiver: job_rcv,
310            scheduler,
311            backend: config.backend.clone(),
312            io,
313            max_in_flight: config.max_in_flight_jobs,
314            force_shutdown: shutdown_rcv,
315        };
316
317        let this = Self {
318            job_sender: job_snd,
319            force_shutdown: shutdown_snd,
320            backend: config.backend,
321        };
322
323        (this, storage_loop)
324    }
325
326    pub fn backend(&self) -> &B {
327        &self.backend
328    }
329
330    /// send shutdown signal to bottomless.
331    /// return a function that can be called to force shutdown, if necessary
332    pub fn send_shutdown(self) -> impl FnOnce() {
333        let force_shutdown = {
334            // we drop the sender, the loop will finish processing scheduled job and exit
335            // gracefully.
336            let Self { force_shutdown, .. } = self;
337            force_shutdown
338        };
339
340        || {
341            let _ = force_shutdown.send(());
342        }
343    }
344}