1use std::fmt::{self, Formatter};
4use std::mem::size_of;
5use std::path::Path;
6use std::pin::Pin;
7use std::str::FromStr;
8use std::sync::Arc;
9use std::task::Poll;
10
11use aws_config::SdkConfig;
12use aws_sdk_s3::operation::create_bucket::CreateBucketError;
13use aws_sdk_s3::operation::get_object::GetObjectOutput;
14use aws_sdk_s3::primitives::{ByteStream, SdkBody};
15use aws_sdk_s3::types::{CreateBucketConfiguration, Object};
16use aws_sdk_s3::Client;
17use bytes::{Bytes, BytesMut};
18use chrono::{DateTime, Utc};
19use http_body::{Frame as HttpFrame, SizeHint};
20use libsql_sys::name::NamespaceName;
21use roaring::RoaringBitmap;
22use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader};
23use tokio_stream::Stream;
24use tokio_util::sync::ReusableBoxFuture;
25use zerocopy::byteorder::little_endian::{U16 as lu16, U32 as lu32, U64 as lu64};
26use zerocopy::{AsBytes, FromBytes, FromZeroes};
27
28use super::{Backend, FindSegmentReq, SegmentMeta};
29use crate::io::buf::ZeroCopyBuf;
30use crate::io::compat::copy_to_file;
31use crate::io::{FileExt, Io, StdIO};
32use crate::segment::compacted::CompactedSegmentDataHeader;
33use crate::segment::Frame;
34use crate::storage::{Error, RestoreOptions, Result, SegmentInfo, SegmentKey};
35use crate::LIBSQL_MAGIC;
36
37pub struct S3Backend<IO> {
38 client: Client,
39 default_config: Arc<S3Config>,
40 io: IO,
41}
42impl S3Backend<StdIO> {
43 pub async fn from_sdk_config(
44 aws_config: SdkConfig,
45 bucket: String,
46 cluster_id: String,
47 ) -> Result<S3Backend<StdIO>> {
48 Self::from_sdk_config_with_io(aws_config, bucket, cluster_id, StdIO(())).await
49 }
50}
51
52#[repr(C)]
54#[derive(Copy, Clone, Debug, AsBytes, FromZeroes, FromBytes)]
55struct SegmentIndexHeader {
56 magic: lu64,
57 version: lu16,
58 len: lu64,
59 checksum: lu32,
60}
61
62impl<IO: Io> S3Backend<IO> {
63 #[doc(hidden)]
64 pub async fn from_sdk_config_with_io(
65 aws_config: SdkConfig,
66 bucket: String,
67 cluster_id: String,
68 io: IO,
69 ) -> Result<Self> {
70 let config = aws_sdk_s3::Config::new(&aws_config)
71 .to_builder()
72 .force_path_style(true)
73 .build();
74
75 let region = config.region().expect("region must be configured").clone();
76
77 let client = Client::from_conf(config);
78 let config = S3Config {
79 bucket,
80 cluster_id,
81 aws_config,
82 };
83
84 let bucket_config = CreateBucketConfiguration::builder()
85 .location_constraint(
86 aws_sdk_s3::types::BucketLocationConstraint::from_str(®ion.to_string()).unwrap(),
87 )
88 .build();
89
90 let create_bucket_ret = client
93 .create_bucket()
94 .create_bucket_configuration(bucket_config)
95 .bucket(&config.bucket)
96 .send()
97 .await;
98
99 match create_bucket_ret {
100 Ok(_) => (),
101 Err(e) => {
102 if let Some(service_error) = e.as_service_error() {
103 match service_error {
104 CreateBucketError::BucketAlreadyExists(_)
105 | CreateBucketError::BucketAlreadyOwnedByYou(_) => {
106 tracing::debug!("bucket already exist");
107 }
108 _ => return Err(Error::unhandled(e, "failed to create bucket")),
109 }
110 } else {
111 return Err(Error::unhandled(e, "failed to create bucket"));
112 }
113 }
114 }
115
116 Ok(Self {
117 client,
118 default_config: config.into(),
119 io,
120 })
121 }
122
123 async fn fetch_segment_data_reader(
124 &self,
125 config: &S3Config,
126 folder_key: &FolderKey<'_>,
127 segment_key: &SegmentKey,
128 ) -> Result<impl AsyncRead> {
129 let key = s3_segment_data_key(folder_key, segment_key);
130 let stream = self.s3_get(config, key).await?;
131 Ok(stream.body.into_async_read())
132 }
133
134 async fn fetch_segment_data_inner(
135 &self,
136 config: &S3Config,
137 folder_key: &FolderKey<'_>,
138 segment_key: &SegmentKey,
139 file: &impl FileExt,
140 ) -> Result<CompactedSegmentDataHeader> {
141 let reader = self
142 .fetch_segment_data_reader(config, folder_key, segment_key)
143 .await?;
144 let mut reader = tokio::io::BufReader::with_capacity(8196, reader);
145 while reader.fill_buf().await?.len() < size_of::<CompactedSegmentDataHeader>() {}
146 let header = CompactedSegmentDataHeader::read_from_prefix(reader.buffer()).unwrap();
147
148 copy_to_file(reader, file).await?;
149
150 Ok(header)
151 }
152
153 async fn s3_get(&self, config: &S3Config, key: impl ToString) -> Result<GetObjectOutput> {
154 Ok(self
155 .client
156 .get_object()
157 .bucket(&config.bucket)
158 .key(key.to_string())
159 .send()
160 .await
161 .map_err(|e| Error::unhandled(e, "error sending s3 GET request"))?)
162 }
163
164 async fn s3_put(&self, config: &S3Config, key: impl ToString, body: ByteStream) -> Result<()> {
165 self.client
166 .put_object()
167 .bucket(&config.bucket)
168 .body(body)
169 .key(key.to_string())
170 .send()
171 .await
172 .map_err(|e| Error::unhandled(e, "error sending s3 PUT request"))?;
173 Ok(())
174 }
175
176 async fn fetch_segment_index_inner(
177 &self,
178 config: &S3Config,
179 folder_key: &FolderKey<'_>,
180 segment_key: &SegmentKey,
181 ) -> Result<fst::Map<Arc<[u8]>>> {
182 let s3_index_key = s3_segment_index_key(folder_key, segment_key);
183 let mut stream = self
184 .s3_get(config, s3_index_key)
185 .await?
186 .body
187 .into_async_read();
188 let mut header: SegmentIndexHeader = SegmentIndexHeader::new_zeroed();
189 stream.read_exact(header.as_bytes_mut()).await?;
190 if header.magic.get() != LIBSQL_MAGIC && header.version.get() != 1 {
191 return Err(Error::InvalidIndex("index header magic or version invalid"));
192 }
193 let mut data = Vec::with_capacity(header.len.get() as _);
194 while stream.read_buf(&mut data).await? != 0 {}
195 let checksum = crc32fast::hash(&data);
196 if checksum != header.checksum.get() {
197 return Err(Error::InvalidIndex("invalid index data checksum"));
198 }
199 let index =
200 fst::Map::new(data.into()).map_err(|_| Error::InvalidIndex("invalid index bytes"))?;
201 Ok(index)
202 }
203
204 async fn find_segment_by_frame_no(
206 &self,
207 config: &S3Config,
208 folder_key: &FolderKey<'_>,
209 frame_no: u64,
210 ) -> Result<Option<SegmentKey>> {
211 let lookup_key_prefix = s3_segment_index_lookup_key_prefix(&folder_key);
212 let lookup_key = s3_segment_index_ends_before_lookup_key(&folder_key, frame_no);
213
214 let objects = self
215 .client
216 .list_objects_v2()
217 .bucket(&config.bucket)
218 .prefix(lookup_key_prefix.to_string())
219 .start_after(lookup_key.to_string())
220 .send()
221 .await
222 .map_err(|e| Error::unhandled(e, "failed to list bucket"))?;
223
224 let Some(contents) = objects.contents().first() else {
225 return Ok(None);
226 };
227 let key = contents.key().expect("misssing key?");
228 let key_path: &Path = key.as_ref();
229
230 let key = SegmentKey::validate_from_path(key_path, &folder_key.namespace);
231
232 Ok(key)
233 }
234
235 #[tracing::instrument(skip(self, config, folder_key))]
238 async fn find_segment_by_timestamp(
239 &self,
240 config: &S3Config,
241 folder_key: &FolderKey<'_>,
242 timestamp: DateTime<Utc>,
243 ) -> Result<Option<SegmentKey>> {
244 let object_to_key = |o: &Object| {
245 let key_path = o.key().unwrap();
246 SegmentKey::validate_from_path(key_path.as_ref(), &folder_key.namespace)
247 };
248
249 let lookup_key_prefix = s3_segment_index_lookup_key_prefix(&folder_key);
250
251 let mut continuation_token = None;
252 loop {
253 let objects = self
254 .client
255 .list_objects_v2()
256 .set_continuation_token(continuation_token.take())
257 .bucket(&config.bucket)
258 .prefix(lookup_key_prefix.to_string())
259 .send()
260 .await
261 .map_err(|e| Error::unhandled(e, "failed to list bucket"))?;
262
263 if objects.contents().is_empty() {
265 return Ok(None);
266 }
267
268 let ts = timestamp.timestamp_millis() as u64;
269 let search_result =
270 objects
271 .contents()
272 .binary_search_by_key(&std::cmp::Reverse(ts), |o| {
273 let key = object_to_key(o).unwrap();
274 std::cmp::Reverse(key.timestamp)
275 });
276
277 match search_result {
278 Ok(i) => {
279 let key = object_to_key(&objects.contents()[i]).unwrap();
280 tracing::trace!("found perfect match for `{timestamp}`: {key}");
281 return Ok(Some(key));
282 }
283 Err(i) if i == 0 => {
284 let key = object_to_key(&objects.contents()[i]).unwrap();
287 tracing::trace!("best match for `{timestamp}` is most recent segment: {key}");
288 return Ok(Some(key));
289 }
290 Err(i) if i == objects.contents().len() => {
291 if objects.continuation_token().is_some() {
295 } else {
297 let key = object_to_key(&objects.contents().last().unwrap()).unwrap();
298 return Ok(Some(key));
299 }
300 }
301 Err(i) => {
304 let left_key = object_to_key(&objects.contents()[i - 1]).unwrap();
306 let right_key = object_to_key(&objects.contents()[i]).unwrap();
307 let time_to_left = left_key.timestamp().signed_duration_since(timestamp).abs();
308 let time_to_right =
309 right_key.timestamp().signed_duration_since(timestamp).abs();
310
311 if time_to_left < time_to_right {
312 return Ok(Some(left_key));
313 } else {
314 return Ok(Some(right_key));
315 }
316 }
317 }
318
319 match objects.continuation_token {
320 Some(token) => {
321 continuation_token = Some(token);
322 }
323 None => {
324 unreachable!("the absence of continuation token should be dealt with earlier");
325 }
326 }
327 }
328 }
329
330 async fn restore_latest(
333 &self,
334 config: &S3Config,
335 namespace: &NamespaceName,
336 dest: impl FileExt,
337 ) -> Result<()> {
338 let folder_key = FolderKey {
339 cluster_id: &config.cluster_id,
340 namespace,
341 };
342 let Some(latest_key) = self
343 .find_segment_by_frame_no(config, &folder_key, u64::MAX)
344 .await?
345 else {
346 tracing::info!("nothing to restore for {namespace}");
347 return Ok(());
348 };
349
350 let reader = self
351 .fetch_segment_data_reader(config, &folder_key, &latest_key)
352 .await?;
353 let mut reader = BufReader::new(reader);
354 let mut header: CompactedSegmentDataHeader = CompactedSegmentDataHeader::new_zeroed();
355 reader.read_exact(header.as_bytes_mut()).await?;
356 let db_size = header.size_after.get();
357 let mut seen = RoaringBitmap::new();
358 let mut frame: Frame = Frame::new_zeroed();
359 loop {
360 for _ in 0..header.frame_count.get() {
361 reader.read_exact(frame.as_bytes_mut()).await?;
362 let page_no = frame.header().page_no();
363 if !seen.contains(page_no) {
364 seen.insert(page_no);
365 let offset = (page_no as u64 - 1) * 4096;
366 let buf = ZeroCopyBuf::new_init(frame).map_slice(|f| f.get_ref().data());
367 let (buf, ret) = dest.write_all_at_async(buf, offset).await;
368 ret?;
369 frame = buf.into_inner().into_inner();
370 }
371 }
372
373 if seen.len() == db_size as u64 {
375 break;
376 }
377
378 let next_frame_no = header.start_frame_no.get() - 1;
379 let Some(key) = self
380 .find_segment_by_frame_no(config, &folder_key, next_frame_no)
381 .await?
382 else {
383 todo!("there should be a segment!");
384 };
385 let r = self
386 .fetch_segment_data_reader(config, &folder_key, &key)
387 .await?;
388 reader = BufReader::new(r);
389 reader.read_exact(header.as_bytes_mut()).await?;
390 }
391
392 Ok(())
393 }
394
395 async fn fetch_segment_from_key(
396 &self,
397 config: &S3Config,
398 folder_key: &FolderKey<'_>,
399 segment_key: &SegmentKey,
400 dest_file: &impl FileExt,
401 ) -> Result<fst::Map<Arc<[u8]>>> {
402 let (_, index) = tokio::try_join!(
403 self.fetch_segment_data_inner(config, &folder_key, &segment_key, dest_file),
404 self.fetch_segment_index_inner(config, &folder_key, &segment_key),
405 )?;
406
407 Ok(index)
408 }
409
410 fn list_segments_inner<'a>(
411 &'a self,
412 config: Arc<S3Config>,
413 namespace: &'a NamespaceName,
414 _until: u64,
415 ) -> impl Stream<Item = Result<SegmentInfo>> + 'a {
416 async_stream::try_stream! {
417 let folder_key = FolderKey { cluster_id: &config.cluster_id, namespace };
418 let lookup_key_prefix = s3_segment_index_lookup_key_prefix(&folder_key);
419
420 let mut continuation_token = None;
421 loop {
422 let objects = self
423 .client
424 .list_objects_v2()
425 .bucket(&config.bucket)
426 .prefix(lookup_key_prefix.to_string())
427 .set_continuation_token(continuation_token.take())
428 .send()
429 .await
430 .map_err(|e| Error::unhandled(e, "failed to list bucket"))?;
431
432 for entry in objects.contents() {
433 let key = entry.key().expect("misssing key?");
434 let key_path: &Path = key.as_ref();
435 let Some(key) = SegmentKey::validate_from_path(key_path, &folder_key.namespace) else { continue };
436
437 let infos = SegmentInfo {
438 key,
439 size: entry.size().unwrap_or(0) as usize,
440 };
441
442 yield infos;
443 }
444
445 if objects.is_truncated().unwrap_or(false) {
446 assert!(objects.next_continuation_token.is_some());
447 continuation_token = objects.next_continuation_token;
448 } else {
449 break
450 }
451 }
452 }
453 }
454}
455
456pub struct S3Config {
457 bucket: String,
458 aws_config: SdkConfig,
459 cluster_id: String,
460}
461
462struct FolderKey<'a> {
463 cluster_id: &'a str,
464 namespace: &'a NamespaceName,
465}
466
467impl fmt::Display for FolderKey<'_> {
468 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
469 write!(
470 f,
471 "v2/clusters/{}/namespaces/{}",
472 self.cluster_id, self.namespace
473 )
474 }
475}
476
477pub struct SegmentDataKey<'a>(&'a FolderKey<'a>, &'a SegmentKey);
478
479impl fmt::Display for SegmentDataKey<'_> {
480 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
481 write!(f, "{}/segments/{}", self.0, self.1)
482 }
483}
484
485fn s3_segment_data_key<'a>(
486 folder_key: &'a FolderKey,
487 segment_key: &'a SegmentKey,
488) -> SegmentDataKey<'a> {
489 SegmentDataKey(folder_key, segment_key)
490}
491
492pub struct SegmentIndexKey<'a>(&'a FolderKey<'a>, &'a SegmentKey);
493
494impl fmt::Display for SegmentIndexKey<'_> {
495 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
496 write!(f, "{}/indexes/{}", self.0, self.1)
497 }
498}
499
500fn s3_segment_index_key<'a>(
501 folder_key: &'a FolderKey,
502 segment_key: &'a SegmentKey,
503) -> SegmentIndexKey<'a> {
504 SegmentIndexKey(folder_key, segment_key)
505}
506
507pub struct SegmentIndexLookupKeyPrefix<'a>(&'a FolderKey<'a>);
508
509impl fmt::Display for SegmentIndexLookupKeyPrefix<'_> {
510 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
511 write!(f, "{}/indexes/", self.0)
512 }
513}
514
515fn s3_segment_index_lookup_key_prefix<'a>(
516 folder_key: &'a FolderKey,
517) -> SegmentIndexLookupKeyPrefix<'a> {
518 SegmentIndexLookupKeyPrefix(folder_key)
519}
520
521pub struct SegmentIndexLookupKey<'a>(&'a FolderKey<'a>, u64);
522
523impl fmt::Display for SegmentIndexLookupKey<'_> {
524 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
525 write!(f, "{}/indexes/{:020}", self.0, u64::MAX - self.1)
526 }
527}
528
529fn s3_segment_index_ends_before_lookup_key<'a>(
531 folder_key: &'a FolderKey,
532 frame_no: u64,
533) -> SegmentIndexLookupKey<'a> {
534 SegmentIndexLookupKey(folder_key, frame_no)
535}
536
537impl<IO> Backend for S3Backend<IO>
538where
539 IO: Io,
540{
541 type Config = Arc<S3Config>;
542
543 async fn store(
544 &self,
545 config: &Self::Config,
546 meta: SegmentMeta,
547 segment_data: impl FileExt,
548 segment_index: Vec<u8>,
549 ) -> Result<()> {
550 let folder_key = FolderKey {
551 cluster_id: &config.cluster_id,
552 namespace: &meta.namespace,
553 };
554 let segment_key = SegmentKey::from(&meta);
555 let s3_data_key = s3_segment_data_key(&folder_key, &segment_key);
556
557 let body = FileStreamBody::new(segment_data).into_byte_stream();
558
559 self.s3_put(config, s3_data_key, body).await?;
560
561 let s3_index_key = s3_segment_index_key(&folder_key, &segment_key);
562
563 let checksum = crc32fast::hash(&segment_index);
564 let header = SegmentIndexHeader {
565 version: 1.into(),
566 len: (segment_index.len() as u64).into(),
567 checksum: checksum.into(),
568 magic: LIBSQL_MAGIC.into(),
569 };
570
571 let mut bytes =
572 BytesMut::with_capacity(size_of::<SegmentIndexHeader>() + segment_index.len());
573 bytes.extend_from_slice(header.as_bytes());
574 bytes.extend_from_slice(&segment_index);
575
576 let body = ByteStream::from(bytes.freeze());
577
578 self.s3_put(config, s3_index_key, body).await?;
579
580 Ok(())
581 }
582
583 async fn meta(
584 &self,
585 config: &Self::Config,
586 namespace: &NamespaceName,
587 ) -> Result<super::DbMeta> {
588 let folder_key = FolderKey {
589 cluster_id: &config.cluster_id,
590 namespace: &namespace,
591 };
592
593 let max_segment_key = self
595 .find_segment_by_frame_no(config, &folder_key, u64::MAX)
596 .await?;
597
598 Ok(super::DbMeta {
599 max_frame_no: max_segment_key.map(|s| s.end_frame_no).unwrap_or(0),
600 })
601 }
602
603 fn default_config(&self) -> Self::Config {
604 self.default_config.clone()
605 }
606
607 async fn restore(
608 &self,
609 config: &Self::Config,
610 namespace: &NamespaceName,
611 restore_options: RestoreOptions,
612 dest: impl FileExt,
613 ) -> Result<()> {
614 match restore_options {
615 RestoreOptions::Latest => self.restore_latest(config, &namespace, dest).await,
616 RestoreOptions::Timestamp(_) => todo!(),
617 }
618 }
619
620 async fn find_segment(
621 &self,
622 config: &Self::Config,
623 namespace: &NamespaceName,
624 req: FindSegmentReq,
625 ) -> Result<SegmentKey> {
626 let folder_key = FolderKey {
627 cluster_id: &config.cluster_id,
628 namespace: &namespace,
629 };
630
631 match req {
632 FindSegmentReq::EndFrameNoLessThan(frame_no) => self
633 .find_segment_by_frame_no(config, &folder_key, frame_no)
634 .await?
635 .ok_or_else(|| Error::SegmentNotFound(req)),
636 FindSegmentReq::Timestamp(ts) => self
637 .find_segment_by_timestamp(config, &folder_key, ts)
638 .await?
639 .ok_or_else(|| Error::SegmentNotFound(req)),
640 }
641 }
642
643 async fn fetch_segment_index(
644 &self,
645 config: &Self::Config,
646 namespace: &NamespaceName,
647 key: &SegmentKey,
648 ) -> Result<fst::Map<Arc<[u8]>>> {
649 let folder_key = FolderKey {
650 cluster_id: &config.cluster_id,
651 namespace: &namespace,
652 };
653 self.fetch_segment_index_inner(config, &folder_key, key)
654 .await
655 }
656
657 async fn fetch_segment_data_to_file(
658 &self,
659 config: &Self::Config,
660 namespace: &NamespaceName,
661 key: &SegmentKey,
662 file: &impl FileExt,
663 ) -> Result<CompactedSegmentDataHeader> {
664 let folder_key = FolderKey {
665 cluster_id: &config.cluster_id,
666 namespace: &namespace,
667 };
668 let header = self
669 .fetch_segment_data_inner(config, &folder_key, key, file)
670 .await?;
671 Ok(header)
672 }
673
674 async fn fetch_segment_data(
675 self: Arc<Self>,
676 config: Self::Config,
677 namespace: NamespaceName,
678 key: SegmentKey,
679 ) -> Result<impl FileExt> {
680 let file = self.io.tempfile()?;
681 self.fetch_segment_data_to_file(&config, &namespace, &key, &file)
682 .await?;
683 Ok(file)
684 }
685
686 fn list_segments<'a>(
687 &'a self,
688 config: Self::Config,
689 namespace: &'a NamespaceName,
690 until: u64,
691 ) -> impl Stream<Item = Result<SegmentInfo>> + 'a {
692 self.list_segments_inner(config, namespace, until)
693 }
694}
695
696#[derive(Clone, Copy)]
697enum StreamState {
698 Init,
699 WaitingChunk,
700 Done,
701}
702
703struct FileStreamBody<F> {
704 inner: Arc<F>,
705 current_offset: u64,
706 chunk_size: usize,
707 state: StreamState,
708 fut: ReusableBoxFuture<'static, std::io::Result<Bytes>>,
709}
710
711impl<F> FileStreamBody<F> {
712 fn new(inner: F) -> Self {
713 Self::new_inner(inner.into())
714 }
715
716 fn new_inner(inner: Arc<F>) -> Self {
717 Self {
718 inner,
719 current_offset: 0,
720 chunk_size: 4096,
721 state: StreamState::Init,
722 fut: ReusableBoxFuture::new(std::future::pending()),
723 }
724 }
725
726 fn into_byte_stream(self) -> ByteStream
727 where
728 F: FileExt,
729 {
730 let body = SdkBody::retryable(move || {
731 let s = Self::new_inner(self.inner.clone());
732 SdkBody::from_body_1_x(s)
733 });
734
735 ByteStream::new(body)
736 }
737}
738
739impl<F> http_body::Body for FileStreamBody<F>
740where
741 F: FileExt,
742{
743 type Data = Bytes;
744 type Error = std::io::Error;
745
746 fn poll_frame(
747 mut self: Pin<&mut Self>,
748 cx: &mut std::task::Context<'_>,
749 ) -> Poll<Option<Result<HttpFrame<Self::Data>, Self::Error>>> {
750 loop {
751 match self.state {
752 StreamState::Init => {
753 let f = self.inner.clone();
754 let chunk_size = self.chunk_size;
755 let current_offset = self.current_offset;
756 let fut = async move {
757 let buf = BytesMut::with_capacity(chunk_size);
758 let (buf, ret) = f.read_at_async(buf, current_offset).await;
759 ret.map(|_| buf.freeze())
760 };
761 self.fut.set(fut);
762 self.state = StreamState::WaitingChunk;
763 }
764 StreamState::WaitingChunk => match self.fut.poll(cx) {
765 Poll::Ready(Ok(buf)) => {
766 if buf.is_empty() {
768 self.state = StreamState::Done;
769 return Poll::Ready(None);
770 } else {
771 self.state = StreamState::Init;
772 self.current_offset += buf.len() as u64;
773 return Poll::Ready(Some(Ok(HttpFrame::data(buf))));
774 }
775 }
776 Poll::Ready(Err(e)) => {
777 self.state = StreamState::Done;
778 return Poll::Ready(Some(Err(e)));
779 }
780 Poll::Pending => return Poll::Pending,
781 },
782 StreamState::Done => return Poll::Ready(None),
783 }
784 }
785 }
786
787 fn size_hint(&self) -> SizeHint {
788 match self.inner.len() {
789 Ok(n) => SizeHint::with_exact(n),
790 Err(_) => SizeHint::new(),
791 }
792 }
793}
794
795#[cfg(test)]
796mod tests {
797 use std::path::Path;
798
799 use aws_config::{BehaviorVersion, Region, SdkConfig};
800 use aws_sdk_s3::config::SharedCredentialsProvider;
801 use chrono::Utc;
802 use fst::MapBuilder;
803 use s3s::auth::SimpleAuth;
804 use s3s::service::{S3ServiceBuilder, SharedS3Service};
805 use uuid::Uuid;
806
807 use crate::io::StdIO;
808
809 use super::*;
810
811 #[track_caller]
812 fn setup(dir: impl AsRef<Path>) -> (SdkConfig, SharedS3Service) {
813 std::fs::create_dir_all(&dir).unwrap();
814 let s3_impl = s3s_fs::FileSystem::new(dir).unwrap();
815
816 let cred = aws_credential_types::Credentials::for_tests();
817
818 let mut s3 = S3ServiceBuilder::new(s3_impl);
819 s3.set_auth(SimpleAuth::from_single(
820 cred.access_key_id(),
821 cred.secret_access_key(),
822 ));
823 s3.set_base_domain("localhost:8014");
824 let s3 = s3.build().into_shared();
825
826 let client = s3s_aws::Client::from(s3.clone());
827
828 let config = aws_config::SdkConfig::builder()
829 .http_client(client)
830 .behavior_version(BehaviorVersion::latest())
831 .region(Region::from_static("us-west-2"))
832 .credentials_provider(SharedCredentialsProvider::new(cred))
833 .endpoint_url("http://localhost:8014")
834 .build();
835
836 (config, s3)
837 }
838
839 #[tokio::test]
840 async fn s3_basic() {
841 let _ = tracing_subscriber::fmt::try_init();
842 let dir = tempfile::tempdir().unwrap();
843 let (aws_config, _s3) = setup(&dir);
844
845 let s3_config = Arc::new(S3Config {
846 bucket: "testbucket".into(),
847 aws_config: aws_config.clone(),
848 cluster_id: "123456789".into(),
849 });
850
851 let storage = S3Backend::from_sdk_config_with_io(
852 aws_config,
853 "testbucket".into(),
854 "123456789".into(),
855 StdIO(()),
856 )
857 .await
858 .unwrap();
859
860 let f_path = dir.path().join("fs-segments");
861 std::fs::write(&f_path, vec![123; 8092]).unwrap();
862
863 let ns = NamespaceName::from_string("foobarbaz".into());
864
865 let mut builder = MapBuilder::memory();
866 builder.insert(42u32.to_be_bytes(), 42).unwrap();
867 let index = builder.into_inner().unwrap();
868 storage
869 .store(
870 &s3_config,
871 SegmentMeta {
872 namespace: ns.clone(),
873 segment_id: Uuid::new_v4(),
874 start_frame_no: 1u64.into(),
875 end_frame_no: 64u64.into(),
876 segment_timestamp: Utc::now(),
877 },
878 std::fs::File::open(&f_path).unwrap(),
879 index,
880 )
881 .await
882 .unwrap();
883
884 let db_meta = storage.meta(&s3_config, &ns).await.unwrap();
885 assert_eq!(db_meta.max_frame_no, 64);
886
887 let mut builder = MapBuilder::memory();
888 builder.insert(44u32.to_be_bytes(), 44).unwrap();
889 let index = builder.into_inner().unwrap();
890 storage
891 .store(
892 &s3_config,
893 SegmentMeta {
894 namespace: ns.clone(),
895 segment_id: Uuid::new_v4(),
896 start_frame_no: 64u64.into(),
897 end_frame_no: 128u64.into(),
898 segment_timestamp: Utc::now(),
899 },
900 std::fs::File::open(&f_path).unwrap(),
901 index,
902 )
903 .await
904 .unwrap();
905
906 let db_meta = storage.meta(&s3_config, &ns).await.unwrap();
907 assert_eq!(db_meta.max_frame_no, 128);
908
909 let key = storage
910 .find_segment(&s3_config, &ns, FindSegmentReq::EndFrameNoLessThan(65))
911 .await
912 .unwrap();
913 assert_eq!(key.start_frame_no, 1);
914 assert_eq!(key.end_frame_no, 64);
915
916 let index = storage
917 .fetch_segment_index(&s3_config, &ns, &key)
918 .await
919 .unwrap();
920 assert_eq!(index.get(42u32.to_be_bytes()).unwrap(), 42);
921 }
922}