1use std::sync::Arc;
5
6use chrono::Utc;
7use libsql_sys::name::NamespaceName;
8use tokio::sync::{mpsc, oneshot};
9use tokio::task::JoinSet;
10use tokio_stream::Stream;
11
12use crate::io::{FileExt, Io, StdIO};
13use crate::segment::compacted::CompactedSegment;
14use crate::segment::Segment;
15
16use super::backend::{Backend, FindSegmentReq};
17use super::scheduler::Scheduler;
18use super::{OnStoreCallback, RestoreOptions, Storage, StoreSegmentRequest};
19
20pub struct AsyncStorageLoop<B: Backend, IO: Io, S> {
27 receiver: mpsc::UnboundedReceiver<StorageLoopMessage<S, B::Config>>,
28 scheduler: Scheduler<S, B::Config>,
29 backend: Arc<B>,
30 io: Arc<IO>,
31 max_in_flight: usize,
32 force_shutdown: oneshot::Receiver<()>,
33}
34
35impl<B, FS, S> AsyncStorageLoop<B, FS, S>
36where
37 FS: Io,
38 B: Backend + 'static,
39 S: Segment,
40{
41 #[tracing::instrument(skip(self))]
50 pub async fn run(mut self) {
51 let mut shutting_down = false;
52 let mut in_flight_futs = JoinSet::new();
53 let mut notify_shutdown = None;
54 loop {
56 if shutting_down && self.scheduler.is_empty() {
57 break;
58 }
59
60 while self.scheduler.has_work() && in_flight_futs.len() < self.max_in_flight {
62 let job = self
63 .scheduler
64 .schedule()
65 .expect("scheduler has work, but didn't return a job");
66 in_flight_futs.spawn(job.perform(self.backend.clone(), self.io.clone()));
67 }
68
69 tokio::select! {
70 biased;
71 Some(join_result) = in_flight_futs.join_next(), if !in_flight_futs.is_empty() => {
72 match join_result {
73 Ok(job_result) => {
74 if shutting_down {
76 tracing::info!("processed job, {} jobs remaining", in_flight_futs.len());
77 }
78 self.scheduler.report(job_result).await;
79 }
80 Err(e) => {
81 tracing::error!("fatal error: bottomless job panicked: {e}");
84 std::process::exit(1);
85 }
86 }
87 }
88 msg = self.receiver.recv(), if !shutting_down => {
89 match msg {
90 Some(StorageLoopMessage::StoreReq(req)) => {
91 self.scheduler.register(req);
92 }
93 Some(StorageLoopMessage::DurableFrameNoReq { namespace, ret, config_override }) => {
94 self.fetch_durable_frame_no_async(namespace, ret, config_override);
95 }
96 Some(StorageLoopMessage::Shutdown(ret)) => {
97 notify_shutdown.replace(ret);
98 shutting_down = true;
99 tracing::info!("Storage shutting down");
100 }
101 None => {
102 shutting_down = true;
103 }
104 }
105 }
106 shutdown = &mut self.force_shutdown => {
107 if shutdown.is_ok() {
108 break
109 } else {
110 }
114 }
115 }
116 }
117
118 tracing::info!("Storage shutdown");
119 if let Some(notify) = notify_shutdown {
120 let _ = notify.send(());
121 }
122 }
123
124 fn fetch_durable_frame_no_async(
125 &self,
126 namespace: NamespaceName,
127 ret: oneshot::Sender<super::Result<u64>>,
128 config_override: Option<B::Config>,
129 ) {
130 let backend = self.backend.clone();
131 let config = match config_override {
132 Some(config) => config,
133 None => backend.default_config(),
134 };
135
136 tokio::spawn(async move {
137 let res = backend
138 .meta(&config, &namespace)
139 .await
140 .map(|meta| meta.max_frame_no);
141 let _ = ret.send(res);
142 });
143 }
144}
145
146pub struct BottomlessConfig<C> {
147 pub max_jobs_conccurency: usize,
149 pub max_enqueued_jobs: usize,
151 pub config: C,
152}
153
154enum StorageLoopMessage<S, C> {
155 StoreReq(StoreSegmentRequest<S, C>),
156 DurableFrameNoReq {
157 namespace: NamespaceName,
158 config_override: Option<C>,
159 ret: oneshot::Sender<super::Result<u64>>,
160 },
161 Shutdown(oneshot::Sender<()>),
162}
163
164pub struct AsyncStorage<B: Backend, S> {
165 job_sender: mpsc::UnboundedSender<StorageLoopMessage<S, B::Config>>,
167 force_shutdown: oneshot::Sender<()>,
168 backend: Arc<B>,
169}
170
171impl<B, S> Storage for AsyncStorage<B, S>
172where
173 B: Backend,
174 S: Segment,
175{
176 type Segment = S;
177 type Config = B::Config;
178
179 async fn shutdown(&self) {
180 let (snd, rcv) = oneshot::channel();
181 let _ = self.job_sender.send(StorageLoopMessage::Shutdown(snd));
182 let _ = rcv.await;
183 }
184
185 fn store(
186 &self,
187 namespace: &NamespaceName,
188 segment: Self::Segment,
189 config_override: Option<Self::Config>,
190 on_store_callback: OnStoreCallback,
191 ) {
192 let req = StoreSegmentRequest {
193 namespace: namespace.clone(),
194 segment,
195 created_at: Utc::now(),
196 storage_config_override: config_override,
197 on_store_callback,
198 };
199
200 self.job_sender
201 .send(StorageLoopMessage::StoreReq(req))
202 .expect("bottomless loop was closed before the handle was dropped");
203 }
204
205 async fn durable_frame_no(
206 &self,
207 namespace: &NamespaceName,
208 config_override: Option<Self::Config>,
209 ) -> super::Result<u64> {
210 let config = config_override.unwrap_or_else(|| self.backend.default_config());
211 let meta = self.backend.meta(&config, namespace).await?;
212 Ok(meta.max_frame_no)
213 }
214
215 async fn restore(
216 &self,
217 file: impl crate::io::FileExt,
218 namespace: &NamespaceName,
219 restore_options: RestoreOptions,
220 config_override: Option<Self::Config>,
221 ) -> super::Result<()> {
222 let config = config_override.unwrap_or_else(|| self.backend.default_config());
223 self.backend
224 .restore(&config, &namespace, restore_options, file)
225 .await
226 }
227
228 async fn find_segment(
229 &self,
230 namespace: &NamespaceName,
231 req: FindSegmentReq,
232 config_override: Option<Self::Config>,
233 ) -> super::Result<super::SegmentKey> {
234 let config = config_override.unwrap_or_else(|| self.backend.default_config());
235 let key = self.backend.find_segment(&config, namespace, req).await?;
236 Ok(key)
237 }
238
239 async fn fetch_segment_index(
240 &self,
241 namespace: &NamespaceName,
242 key: &super::SegmentKey,
243 config_override: Option<Self::Config>,
244 ) -> super::Result<fst::Map<Arc<[u8]>>> {
245 let config = config_override.unwrap_or_else(|| self.backend.default_config());
246 let index = self
247 .backend
248 .fetch_segment_index(&config, namespace, key)
249 .await?;
250 Ok(index)
251 }
252
253 async fn fetch_segment_data(
254 &self,
255 namespace: &NamespaceName,
256 key: &super::SegmentKey,
257 config_override: Option<Self::Config>,
258 ) -> super::Result<CompactedSegment<impl FileExt>> {
259 let config = config_override.unwrap_or_else(|| self.backend.default_config());
261 let backend = self.backend.clone();
262 let file = backend
263 .fetch_segment_data(config, namespace.clone(), *key)
264 .await?;
265 let segment = CompactedSegment::open(file).await?;
266 Ok(segment)
267 }
268
269 fn list_segments<'a>(
270 &'a self,
271 namespace: &'a NamespaceName,
272 until: u64,
273 config_override: Option<Self::Config>,
274 ) -> impl Stream<Item = super::Result<super::SegmentInfo>> + 'a {
275 let config = config_override.unwrap_or_else(|| self.backend.default_config());
276 self.backend.list_segments(config, namespace, until)
277 }
278}
279
280pub struct AsyncStorageInitConfig<B> {
281 pub backend: Arc<B>,
282 pub max_in_flight_jobs: usize,
283}
284
285impl<B: Backend, S> AsyncStorage<B, S> {
286 pub async fn new(
287 config: AsyncStorageInitConfig<B>,
288 ) -> (AsyncStorage<B, S>, AsyncStorageLoop<B, StdIO, S>)
289 where
290 B: Backend,
291 S: Segment,
292 {
293 Self::new_with_io(config, Arc::new(StdIO(()))).await
294 }
295
296 pub async fn new_with_io<IO>(
297 config: AsyncStorageInitConfig<B>,
298 io: Arc<IO>,
299 ) -> (AsyncStorage<B, S>, AsyncStorageLoop<B, IO, S>)
300 where
301 B: Backend,
302 IO: Io,
303 S: Segment,
304 {
305 let (job_snd, job_rcv) = tokio::sync::mpsc::unbounded_channel();
306 let (shutdown_snd, shutdown_rcv) = tokio::sync::oneshot::channel();
307 let scheduler = Scheduler::new();
308 let storage_loop = AsyncStorageLoop {
309 receiver: job_rcv,
310 scheduler,
311 backend: config.backend.clone(),
312 io,
313 max_in_flight: config.max_in_flight_jobs,
314 force_shutdown: shutdown_rcv,
315 };
316
317 let this = Self {
318 job_sender: job_snd,
319 force_shutdown: shutdown_snd,
320 backend: config.backend,
321 };
322
323 (this, storage_loop)
324 }
325
326 pub fn backend(&self) -> &B {
327 &self.backend
328 }
329
330 pub fn send_shutdown(self) -> impl FnOnce() {
333 let force_shutdown = {
334 let Self { force_shutdown, .. } = self;
337 force_shutdown
338 };
339
340 || {
341 let _ = force_shutdown.send(());
342 }
343 }
344}