1use std::collections::{BTreeMap, BTreeSet};
2use std::fs::{self, File, OpenOptions};
3use std::io;
4use std::io::ErrorKind;
5use std::net::SocketAddr;
6use std::os::unix::fs::{FileExt, OpenOptionsExt};
7use std::path::{Path, PathBuf};
8use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
9use std::sync::{mpsc, Arc, RwLock};
10use std::thread;
11use std::time::{Duration, Instant, SystemTime};
12
13use anyhow::{Context, Result};
14use arc_swap::ArcSwap;
15use crossbeam_skiplist::SkipMap;
16use fst::{IntoStreamer, Streamer};
17use futures::TryStreamExt;
18use hyper_util::rt::{TokioExecutor, TokioIo};
19use hyper_util::server::conn::auto::Builder as ConnBuilder;
20#[cfg(target_os = "linux")]
21use nix::libc;
22use s3s::auth::SimpleAuth;
23use s3s::dto::*;
24use s3s::service::S3ServiceBuilder;
25use s3s::{s3_error, Body, S3Request, S3Response, S3Result};
26use socket2::{Domain, Protocol, Socket, Type};
27
28use crate::module_d;
29
30const ALIGNMENT: usize = 4096;
31const RECORD_HEADER_BYTES: usize = 12;
32
33#[derive(Debug, Clone)]
34pub struct ModuleFConfig {
35 pub listen_addr: String,
36 pub workers: usize,
37 pub target_path: PathBuf,
38 pub group_bytes: usize,
39 pub compaction_threshold: usize,
40 pub flush_interval_ms: u64,
41 pub default_bucket: String,
42 pub access_key: String,
43 pub secret_key: String,
44 pub allow_file_fallback: bool,
45 pub require_io_uring: bool,
46}
47
48#[derive(Debug, Clone)]
49struct StoredObjectMeta {
50 payload_offset: u64,
51 stored_len: u32,
52 original_len: u32,
53 crc32c: u32,
54 last_modified: Timestamp,
55}
56
57#[derive(Debug)]
58struct PendingRecord {
59 record_start: usize,
60 stored_len: u32,
61 original_len: u32,
62 crc32c: u32,
63 response_tx: tokio::sync::oneshot::Sender<Result<StoredObjectMeta>>,
64}
65
66#[derive(Debug)]
67struct AppendRequest {
68 payload: Vec<u8>,
69 original_len: u32,
70 crc32c: u32,
71 response_tx: tokio::sync::oneshot::Sender<Result<StoredObjectMeta>>,
72}
73
74#[derive(Clone)]
75struct GroupCommitAppender {
76 submit_tx: mpsc::Sender<AppendRequest>,
77 read_path: PathBuf,
78 mode: String,
79}
80
81impl GroupCommitAppender {
82 fn start(config: &ModuleFConfig) -> Result<Self> {
83 if config.require_io_uring && !module_d::io_uring_available() {
84 anyhow::bail!("module-f requires io_uring, but probe failed in current environment");
85 }
86
87 let (file, mode, resolved_path) =
88 open_target_file(&config.target_path, config.allow_file_fallback).with_context(
89 || {
90 format!(
91 "failed to open module-f target {}",
92 config.target_path.display()
93 )
94 },
95 )?;
96
97 let (submit_tx, submit_rx) = mpsc::channel::<AppendRequest>();
98 let group_bytes = config.group_bytes;
99 let flush_interval = Duration::from_millis(config.flush_interval_ms.max(1));
100
101 thread::Builder::new()
102 .name("module-f-group-commit".to_string())
103 .spawn(move || {
104 if let Err(err) =
105 run_group_commit_loop(file, submit_rx, group_bytes, flush_interval)
106 {
107 eprintln!("module-f appender thread exiting with error: {err:#}");
108 }
109 })
110 .context("failed spawning module-f group-commit thread")?;
111
112 Ok(Self {
113 submit_tx,
114 read_path: resolved_path,
115 mode,
116 })
117 }
118
119 async fn append_payload(
120 &self,
121 payload: Vec<u8>,
122 original_len: u32,
123 crc32c: u32,
124 ) -> Result<StoredObjectMeta> {
125 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
126 self.submit_tx
127 .send(AppendRequest {
128 payload,
129 original_len,
130 crc32c,
131 response_tx,
132 })
133 .map_err(|_| anyhow::anyhow!("module-f appender queue closed"))?;
134
135 response_rx
136 .await
137 .context("module-f appender response channel dropped")?
138 }
139
140 fn read_payload(&self, offset: u64, len: usize) -> Result<Vec<u8>> {
141 let file = File::open(&self.read_path)
142 .with_context(|| format!("failed opening read path {}", self.read_path.display()))?;
143
144 let mut buffer = vec![0_u8; len];
145 let mut filled = 0usize;
146 while filled < len {
147 let n = file
148 .read_at(&mut buffer[filled..], offset + filled as u64)
149 .context("module-f read_at failed")?;
150 if n == 0 {
151 anyhow::bail!(
152 "unexpected EOF while reading payload at offset {} (wanted {}, got {})",
153 offset,
154 len,
155 filled
156 );
157 }
158 filled += n;
159 }
160
161 Ok(buffer)
162 }
163}
164
165#[derive(Debug)]
166struct ImmutableIndex {
167 entries: BTreeMap<String, StoredObjectMeta>,
168 fst: fst::Map<Vec<u8>>,
169}
170
171impl ImmutableIndex {
172 fn empty() -> Self {
173 let builder = fst::MapBuilder::memory();
174 let bytes = builder.into_inner().unwrap_or_default();
175 let fst = fst::Map::new(bytes).unwrap();
176 Self {
177 entries: BTreeMap::new(),
178 fst,
179 }
180 }
181}
182
183#[derive(Debug)]
184struct CompactionJob {
185 delta: Vec<(String, Option<StoredObjectMeta>)>,
186}
187
188#[derive(Clone)]
189struct LsmIndex {
190 l0: Arc<SkipMap<String, Option<StoredObjectMeta>>>,
191 l1: Arc<ArcSwap<ImmutableIndex>>,
192 compaction_threshold: usize,
193 compaction_in_progress: Arc<AtomicBool>,
194 compaction_count: Arc<AtomicU64>,
195 compaction_total_ns: Arc<AtomicU64>,
196 compaction_tx: mpsc::Sender<CompactionJob>,
197}
198
199impl LsmIndex {
200 fn new(compaction_threshold: usize) -> Result<Self> {
201 let l0 = Arc::new(SkipMap::<String, Option<StoredObjectMeta>>::new());
202 let l1 = Arc::new(ArcSwap::from_pointee(ImmutableIndex::empty()));
203 let compaction_in_progress = Arc::new(AtomicBool::new(false));
204 let compaction_count = Arc::new(AtomicU64::new(0));
205 let compaction_total_ns = Arc::new(AtomicU64::new(0));
206 let (compaction_tx, compaction_rx) = mpsc::channel::<CompactionJob>();
207
208 let l1_for_worker = l1.clone();
209 let compaction_in_progress_worker = compaction_in_progress.clone();
210 let compaction_count_worker = compaction_count.clone();
211 let compaction_total_ns_worker = compaction_total_ns.clone();
212
213 thread::Builder::new()
214 .name("module-f-compaction".to_string())
215 .spawn(move || {
216 for job in compaction_rx {
217 let started = Instant::now();
218 match build_compacted_index(l1_for_worker.load_full().as_ref(), job.delta) {
219 Ok(new_index) => {
220 l1_for_worker.store(Arc::new(new_index));
221 compaction_count_worker.fetch_add(1, Ordering::Relaxed);
222 compaction_total_ns_worker
223 .fetch_add(started.elapsed().as_nanos() as u64, Ordering::Relaxed);
224 }
225 Err(err) => {
226 eprintln!("module-f compaction error: {err:#}");
227 }
228 }
229 compaction_in_progress_worker.store(false, Ordering::Release);
230 }
231 })
232 .context("failed spawning module-f compaction thread")?;
233
234 Ok(Self {
235 l0,
236 l1,
237 compaction_threshold,
238 compaction_in_progress,
239 compaction_count,
240 compaction_total_ns,
241 compaction_tx,
242 })
243 }
244
245 fn upsert(&self, key: String, meta: StoredObjectMeta) {
246 self.l0.insert(key, Some(meta));
247 self.maybe_schedule_compaction();
248 }
249
250 fn delete(&self, key: String) {
251 self.l0.insert(key, None);
252 self.maybe_schedule_compaction();
253 }
254
255 fn get(&self, key: &str) -> Option<StoredObjectMeta> {
256 if let Some(entry) = self.l0.get(key) {
257 return entry.value().clone();
258 }
259 self.l1.load().entries.get(key).cloned()
260 }
261
262 fn list_prefix(&self, prefix: &str, max_keys: usize) -> Vec<(String, StoredObjectMeta)> {
263 let mut merged = BTreeMap::<String, StoredObjectMeta>::new();
264 let l1 = self.l1.load_full();
265
266 let mut range = l1.fst.range().ge(prefix.as_bytes());
267 if let Some(upper) = prefix_upper_bound(prefix.as_bytes()) {
268 range = range.lt(upper.as_slice());
269 }
270 let mut stream = range.into_stream();
271 while let Some((key_bytes, _)) = stream.next() {
272 let key = String::from_utf8_lossy(key_bytes).to_string();
273 if let Some(meta) = l1.entries.get(&key) {
274 merged.insert(key, meta.clone());
275 }
276 }
277
278 for entry in self.l0.iter() {
279 let key = entry.key();
280 if !key.starts_with(prefix) {
281 continue;
282 }
283 match entry.value().clone() {
284 Some(meta) => {
285 merged.insert(key.clone(), meta);
286 }
287 None => {
288 merged.remove(key);
289 }
290 }
291 }
292
293 merged.into_iter().take(max_keys).collect()
294 }
295
296 fn compaction_metrics(&self) -> (u64, f64) {
297 let count = self.compaction_count.load(Ordering::Relaxed);
298 if count == 0 {
299 return (0, 0.0);
300 }
301 let total_ns = self.compaction_total_ns.load(Ordering::Relaxed);
302 let avg_ms = (total_ns as f64 / count as f64) / 1_000_000.0;
303 (count, avg_ms)
304 }
305
306 fn maybe_schedule_compaction(&self) {
307 if self.l0.len() < self.compaction_threshold {
308 return;
309 }
310 if self
311 .compaction_in_progress
312 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
313 .is_err()
314 {
315 return;
316 }
317
318 let mut delta = Vec::<(String, Option<StoredObjectMeta>)>::new();
319 for entry in self.l0.iter() {
320 delta.push((entry.key().clone(), entry.value().clone()));
321 }
322
323 if delta.is_empty() {
324 self.compaction_in_progress.store(false, Ordering::Release);
325 return;
326 }
327
328 for (key, _) in &delta {
329 let _ = self.l0.remove(key);
330 }
331
332 if self.compaction_tx.send(CompactionJob { delta }).is_err() {
333 self.compaction_in_progress.store(false, Ordering::Release);
334 }
335 }
336}
337
338fn build_compacted_index(
339 previous: &ImmutableIndex,
340 delta: Vec<(String, Option<StoredObjectMeta>)>,
341) -> Result<ImmutableIndex> {
342 let mut merged = previous.entries.clone();
343 for (key, value) in delta {
344 match value {
345 Some(meta) => {
346 merged.insert(key, meta);
347 }
348 None => {
349 merged.remove(&key);
350 }
351 }
352 }
353
354 let mut builder = fst::MapBuilder::memory();
355 for (ordinal, key) in merged.keys().enumerate() {
356 builder
357 .insert(key, ordinal as u64)
358 .with_context(|| format!("failed inserting key into compacted fst: {key}"))?;
359 }
360 let bytes = builder
361 .into_inner()
362 .context("failed finalizing compacted fst bytes")?;
363 let fst = fst::Map::new(bytes).context("failed loading compacted fst map")?;
364
365 Ok(ImmutableIndex {
366 entries: merged,
367 fst,
368 })
369}
370
371#[derive(Clone)]
372struct GatewayState {
373 appender: GroupCommitAppender,
374 index: LsmIndex,
375 buckets: Arc<RwLock<BTreeSet<String>>>,
376 default_bucket: String,
377 request_count: Arc<AtomicU64>,
378 puts: Arc<AtomicU64>,
379 gets: Arc<AtomicU64>,
380 lists: Arc<AtomicU64>,
381 deletes: Arc<AtomicU64>,
382 last_compaction_count: Arc<AtomicU64>,
383 last_compaction_avg_ms_x1000: Arc<AtomicU64>,
384}
385
386#[derive(Clone)]
387struct GatewayS3 {
388 state: GatewayState,
389}
390
391#[async_trait::async_trait]
392impl s3s::S3 for GatewayS3 {
393 async fn create_bucket(
394 &self,
395 req: S3Request<CreateBucketInput>,
396 ) -> S3Result<S3Response<CreateBucketOutput>> {
397 let bucket = req.input.bucket;
398 let mut buckets = self
399 .state
400 .buckets
401 .write()
402 .map_err(|_| s3_error!(InternalError))?;
403 buckets.insert(bucket);
404 Ok(S3Response::new(CreateBucketOutput::default()))
405 }
406
407 async fn head_bucket(
408 &self,
409 req: S3Request<HeadBucketInput>,
410 ) -> S3Result<S3Response<HeadBucketOutput>> {
411 let bucket = req.input.bucket;
412 let buckets = self
413 .state
414 .buckets
415 .read()
416 .map_err(|_| s3_error!(InternalError))?;
417 if !buckets.contains(&bucket) {
418 return Err(s3_error!(NoSuchBucket));
419 }
420 Ok(S3Response::new(HeadBucketOutput::default()))
421 }
422
423 async fn list_buckets(
424 &self,
425 _req: S3Request<ListBucketsInput>,
426 ) -> S3Result<S3Response<ListBucketsOutput>> {
427 let buckets = self
428 .state
429 .buckets
430 .read()
431 .map_err(|_| s3_error!(InternalError))?;
432 let mut items = Vec::with_capacity(buckets.len());
433 for bucket in buckets.iter() {
434 items.push(Bucket {
435 name: Some(bucket.clone()),
436 ..Default::default()
437 });
438 }
439
440 Ok(S3Response::new(ListBucketsOutput {
441 buckets: Some(items),
442 ..Default::default()
443 }))
444 }
445
446 async fn get_bucket_location(
447 &self,
448 req: S3Request<GetBucketLocationInput>,
449 ) -> S3Result<S3Response<GetBucketLocationOutput>> {
450 self.state.request_count.fetch_add(1, Ordering::Relaxed);
451 ensure_bucket_exists(&self.state, &req.input.bucket)?;
452 Ok(S3Response::new(GetBucketLocationOutput::default()))
453 }
454
455 async fn put_object(
456 &self,
457 req: S3Request<PutObjectInput>,
458 ) -> S3Result<S3Response<PutObjectOutput>> {
459 self.state.request_count.fetch_add(1, Ordering::Relaxed);
460 self.state.puts.fetch_add(1, Ordering::Relaxed);
461
462 let input = req.input;
463 let bucket = input.bucket;
464 let key = input.key;
465
466 ensure_bucket_exists(&self.state, &bucket)?;
467
468 let Some(body) = input.body else {
469 return Err(s3_error!(IncompleteBody));
470 };
471
472 let raw = read_streaming_blob(body)
473 .await
474 .map_err(|_| s3_error!(InvalidRequest, "failed reading request body"))?;
475 let original_len = u32::try_from(raw.len())
476 .map_err(|_| s3_error!(InvalidRequest, "object body too large for tracer bullet"))?;
477 let crc = crc32c::crc32c(&raw);
478 let compressed = zstd::bulk::compress(&raw, 1)
479 .map_err(|_| s3_error!(InternalError, "zstd compression failed"))?;
480
481 let stored = self
482 .state
483 .appender
484 .append_payload(compressed, original_len, crc)
485 .await
486 .map_err(|_| s3_error!(InternalError, "append path failed"))?;
487
488 self.state
489 .index
490 .upsert(storage_key(&bucket, &key), stored.clone());
491
492 let (compactions, avg_ms) = self.state.index.compaction_metrics();
493 self.state
494 .last_compaction_count
495 .store(compactions, Ordering::Relaxed);
496 self.state
497 .last_compaction_avg_ms_x1000
498 .store((avg_ms * 1000.0) as u64, Ordering::Relaxed);
499
500 Ok(S3Response::new(PutObjectOutput {
501 e_tag: Some(ETag::Strong(format!("{:08x}", stored.crc32c))),
502 ..Default::default()
503 }))
504 }
505
506 async fn get_object(
507 &self,
508 req: S3Request<GetObjectInput>,
509 ) -> S3Result<S3Response<GetObjectOutput>> {
510 self.state.request_count.fetch_add(1, Ordering::Relaxed);
511 self.state.gets.fetch_add(1, Ordering::Relaxed);
512
513 let input = req.input;
514 let bucket = input.bucket;
515 let key = input.key;
516
517 let Some(meta) = self.state.index.get(&storage_key(&bucket, &key)) else {
518 return Err(s3_error!(NoSuchKey));
519 };
520
521 let payload = self
522 .state
523 .appender
524 .read_payload(meta.payload_offset, meta.stored_len as usize)
525 .map_err(|_| s3_error!(InternalError, "read path failed"))?;
526
527 let mut restored = zstd::bulk::decompress(&payload, meta.original_len as usize)
528 .map_err(|_| s3_error!(InternalError, "zstd decompression failed"))?;
529
530 let mut content_range = None;
531 if let Some(range) = input.range {
532 let checked = range
533 .check(restored.len() as u64)
534 .map_err(|_| s3_error!(InvalidRange))?;
535 let start = checked.start as usize;
536 let end = checked.end as usize;
537 restored = restored[start..end].to_vec();
538 content_range = Some(format!(
539 "bytes {}-{}/{}",
540 checked.start,
541 checked.end.saturating_sub(1),
542 meta.original_len
543 ));
544 }
545
546 let content_len_i64 =
547 i64::try_from(restored.len()).map_err(|_| s3_error!(InternalError))?;
548
549 Ok(S3Response::new(GetObjectOutput {
550 body: Some(StreamingBlob::from(Body::from(restored))),
551 content_length: Some(content_len_i64),
552 content_range,
553 e_tag: Some(ETag::Strong(format!("{:08x}", meta.crc32c))),
554 last_modified: Some(meta.last_modified.clone()),
555 ..Default::default()
556 }))
557 }
558
559 async fn list_objects_v2(
560 &self,
561 req: S3Request<ListObjectsV2Input>,
562 ) -> S3Result<S3Response<ListObjectsV2Output>> {
563 self.state.request_count.fetch_add(1, Ordering::Relaxed);
564 self.state.lists.fetch_add(1, Ordering::Relaxed);
565
566 let input = req.input;
567 ensure_bucket_exists(&self.state, &input.bucket)?;
568
569 let prefix = input.prefix.clone().unwrap_or_default();
570 let max_keys_i32 = input.max_keys.unwrap_or(1000).max(1);
571 let max_keys = usize::try_from(max_keys_i32).map_err(|_| s3_error!(InvalidArgument))?;
572 let storage_prefix = storage_key_prefix(&input.bucket, &prefix);
573 let mut rows = self
574 .state
575 .index
576 .list_prefix(&storage_prefix, max_keys.saturating_add(1));
577
578 if let Some(token) = input
579 .continuation_token
580 .clone()
581 .or(input.start_after.clone())
582 {
583 let token_key = storage_key(&input.bucket, &token);
584 rows.retain(|(full_key, _)| full_key > &token_key);
585 }
586
587 let is_truncated = rows.len() > max_keys;
588 rows.truncate(max_keys);
589
590 let mut contents = Vec::with_capacity(rows.len());
591 for (full_key, meta) in rows {
592 let key = strip_storage_prefix(&input.bucket, &full_key)
593 .ok_or_else(|| s3_error!(InternalError, "invalid internal storage key"))?;
594 contents.push(Object {
595 key: Some(key),
596 size: Some(i64::from(meta.original_len)),
597 e_tag: Some(ETag::Strong(format!("{:08x}", meta.crc32c))),
598 last_modified: Some(meta.last_modified.clone()),
599 ..Default::default()
600 });
601 }
602
603 let key_count = i32::try_from(contents.len()).map_err(|_| s3_error!(InternalError))?;
604
605 Ok(S3Response::new(ListObjectsV2Output {
606 name: Some(input.bucket),
607 prefix: input.prefix,
608 key_count: Some(key_count),
609 max_keys: Some(max_keys_i32),
610 is_truncated: Some(is_truncated),
611 contents: (!contents.is_empty()).then_some(contents),
612 ..Default::default()
613 }))
614 }
615
616 async fn head_object(
617 &self,
618 req: S3Request<HeadObjectInput>,
619 ) -> S3Result<S3Response<HeadObjectOutput>> {
620 self.state.request_count.fetch_add(1, Ordering::Relaxed);
621 self.state.gets.fetch_add(1, Ordering::Relaxed);
622 let input = req.input;
623 let Some(meta) = self
624 .state
625 .index
626 .get(&storage_key(&input.bucket, &input.key))
627 else {
628 return Err(s3_error!(NoSuchKey));
629 };
630
631 Ok(S3Response::new(HeadObjectOutput {
632 content_length: Some(i64::from(meta.original_len)),
633 e_tag: Some(ETag::Strong(format!("{:08x}", meta.crc32c))),
634 last_modified: Some(meta.last_modified.clone()),
635 ..Default::default()
636 }))
637 }
638
639 async fn delete_object(
640 &self,
641 req: S3Request<DeleteObjectInput>,
642 ) -> S3Result<S3Response<DeleteObjectOutput>> {
643 self.state.request_count.fetch_add(1, Ordering::Relaxed);
644 self.state.deletes.fetch_add(1, Ordering::Relaxed);
645 let input = req.input;
646 self.state
647 .index
648 .delete(storage_key(&input.bucket, &input.key));
649 Ok(S3Response::new(DeleteObjectOutput::default()))
650 }
651
652 async fn delete_objects(
653 &self,
654 req: S3Request<DeleteObjectsInput>,
655 ) -> S3Result<S3Response<DeleteObjectsOutput>> {
656 self.state.request_count.fetch_add(1, Ordering::Relaxed);
657
658 let input = req.input;
659 ensure_bucket_exists(&self.state, &input.bucket)?;
660
661 let mut deleted = Vec::<DeletedObject>::new();
662 for object in input.delete.objects {
663 self.state.deletes.fetch_add(1, Ordering::Relaxed);
664 self.state
665 .index
666 .delete(storage_key(&input.bucket, &object.key));
667 deleted.push(DeletedObject {
668 key: Some(object.key),
669 ..Default::default()
670 });
671 }
672
673 Ok(S3Response::new(DeleteObjectsOutput {
674 deleted: Some(deleted),
675 ..Default::default()
676 }))
677 }
678}
679
680pub fn serve(config: ModuleFConfig) -> Result<()> {
681 validate_config(&config)?;
682
683 let listen_addr: SocketAddr = config
684 .listen_addr
685 .parse()
686 .with_context(|| format!("invalid listen_addr {}", config.listen_addr))?;
687
688 let appender = GroupCommitAppender::start(&config)?;
689 let index = LsmIndex::new(config.compaction_threshold)?;
690 let buckets = Arc::new(RwLock::new(BTreeSet::<String>::new()));
691
692 {
693 let mut guard = buckets
694 .write()
695 .map_err(|_| anyhow::anyhow!("bucket lock poisoned"))?;
696 guard.insert(config.default_bucket.clone());
697 }
698
699 let state = GatewayState {
700 appender: appender.clone(),
701 index,
702 buckets,
703 default_bucket: config.default_bucket.clone(),
704 request_count: Arc::new(AtomicU64::new(0)),
705 puts: Arc::new(AtomicU64::new(0)),
706 gets: Arc::new(AtomicU64::new(0)),
707 lists: Arc::new(AtomicU64::new(0)),
708 deletes: Arc::new(AtomicU64::new(0)),
709 last_compaction_count: Arc::new(AtomicU64::new(0)),
710 last_compaction_avg_ms_x1000: Arc::new(AtomicU64::new(0)),
711 };
712
713 let core_ids = core_affinity::get_core_ids().unwrap_or_default();
714 let worker_count = config.workers;
715 let access_key = config.access_key.clone();
716 let secret_key = config.secret_key.clone();
717 let accept_counts = Arc::new(
718 (0..worker_count)
719 .map(|_| AtomicU64::new(0))
720 .collect::<Vec<_>>(),
721 );
722
723 eprintln!(
724 "module-f starting: addr={} workers={} mode={} target={} default_bucket={} require_io_uring={}",
725 listen_addr,
726 worker_count,
727 appender.mode,
728 appender.read_path.display(),
729 state.default_bucket,
730 config.require_io_uring
731 );
732
733 let mut worker_handles = Vec::with_capacity(worker_count);
734 for worker_idx in 0..worker_count {
735 let state = state.clone();
736 let accept_counts = accept_counts.clone();
737 let access_key = access_key.clone();
738 let secret_key = secret_key.clone();
739 let worker_core = if core_ids.is_empty() {
740 None
741 } else {
742 Some(core_ids[worker_idx % core_ids.len()])
743 };
744
745 let handle = thread::Builder::new()
746 .name(format!("module-f-worker-{worker_idx}"))
747 .spawn(move || -> Result<()> {
748 if let Some(core_id) = worker_core {
749 let _ = core_affinity::set_for_current(core_id);
750 }
751
752 let std_listener = bind_reuseport_listener(listen_addr)
753 .with_context(|| format!("worker {} failed to bind listener", worker_idx))?;
754 let rt = tokio::runtime::Builder::new_current_thread()
755 .enable_io()
756 .enable_time()
757 .build()
758 .context("failed creating current-thread runtime")?;
759
760 rt.block_on(async move {
761 let local = tokio::task::LocalSet::new();
762 local
763 .run_until(async move {
764 let listener = tokio::net::TcpListener::from_std(std_listener)
765 .context("failed to adapt std listener")?;
766
767 let gateway = GatewayS3 { state };
768 let s3_service = {
769 let mut builder = S3ServiceBuilder::new(gateway);
770 builder.set_auth(SimpleAuth::from_single(
771 access_key.as_str(),
772 secret_key.as_str(),
773 ));
774 builder.build()
775 };
776
777 loop {
778 let (stream, _remote) = listener
779 .accept()
780 .await
781 .context("accept failed in module-f worker")?;
782 accept_counts[worker_idx].fetch_add(1, Ordering::Relaxed);
783 let service = s3_service.clone();
784
785 tokio::task::spawn_local(async move {
786 let io = TokioIo::new(stream);
787 let conn = ConnBuilder::new(TokioExecutor::new())
788 .serve_connection(io, service)
789 .into_owned();
790 if let Err(err) = conn.await {
791 eprintln!("module-f connection error: {err}");
792 }
793 });
794 }
795 })
796 .await
797 })
798 })
799 .with_context(|| format!("failed spawning module-f worker {worker_idx}"))?;
800
801 worker_handles.push(handle);
802 }
803
804 let stats_state = state.clone();
805 let stats_accept_counts = accept_counts.clone();
806 let _stats_handle = thread::Builder::new()
807 .name("module-f-stats".to_string())
808 .spawn(move || loop {
809 thread::sleep(Duration::from_secs(10));
810 let mut accepts = Vec::with_capacity(stats_accept_counts.len());
811 for (idx, counter) in stats_accept_counts.iter().enumerate() {
812 accepts.push(format!("w{idx}={}", counter.load(Ordering::Relaxed)));
813 }
814 let (compactions, avg_ms) = stats_state.index.compaction_metrics();
815 eprintln!(
816 "module-f stats requests={} puts={} gets={} lists={} deletes={} compactions={} compaction_avg_ms={:.3} accepts=[{}]",
817 stats_state.request_count.load(Ordering::Relaxed),
818 stats_state.puts.load(Ordering::Relaxed),
819 stats_state.gets.load(Ordering::Relaxed),
820 stats_state.lists.load(Ordering::Relaxed),
821 stats_state.deletes.load(Ordering::Relaxed),
822 compactions,
823 avg_ms,
824 accepts.join(",")
825 );
826 })
827 .context("failed spawning module-f stats thread")?;
828
829 for handle in worker_handles {
830 let worker_result = handle
831 .join()
832 .map_err(|_| anyhow::anyhow!("module-f worker thread panicked"))?;
833 worker_result?;
834 }
835
836 Ok(())
837}
838
839fn validate_config(config: &ModuleFConfig) -> Result<()> {
840 if config.workers == 0 {
841 anyhow::bail!("workers must be > 0");
842 }
843 if config.group_bytes == 0 || config.group_bytes % ALIGNMENT != 0 {
844 anyhow::bail!("group_bytes must be > 0 and aligned to {}", ALIGNMENT);
845 }
846 if config.compaction_threshold == 0 {
847 anyhow::bail!("compaction_threshold must be > 0");
848 }
849 if config.flush_interval_ms == 0 {
850 anyhow::bail!("flush_interval_ms must be > 0");
851 }
852 if config.default_bucket.trim().is_empty() {
853 anyhow::bail!("default bucket must be non-empty");
854 }
855 if config.access_key.trim().is_empty() || config.secret_key.trim().is_empty() {
856 anyhow::bail!("access_key and secret_key must be non-empty");
857 }
858 Ok(())
859}
860
861fn ensure_bucket_exists(state: &GatewayState, bucket: &str) -> S3Result<()> {
862 let buckets = state.buckets.read().map_err(|_| s3_error!(InternalError))?;
863 if !buckets.contains(bucket) {
864 return Err(s3_error!(NoSuchBucket));
865 }
866 Ok(())
867}
868
869async fn read_streaming_blob(mut body: StreamingBlob) -> Result<Vec<u8>> {
870 let mut out = Vec::new();
871 while let Some(chunk) = body
872 .try_next()
873 .await
874 .map_err(|err| anyhow::anyhow!("failed reading streaming blob: {err}"))?
875 {
876 out.extend_from_slice(chunk.as_ref());
877 }
878 Ok(out)
879}
880
881fn storage_key(bucket: &str, key: &str) -> String {
882 format!("{bucket}/{key}")
883}
884
885fn storage_key_prefix(bucket: &str, prefix: &str) -> String {
886 format!("{bucket}/{prefix}")
887}
888
889fn strip_storage_prefix(bucket: &str, full_key: &str) -> Option<String> {
890 let prefix = format!("{bucket}/");
891 full_key.strip_prefix(&prefix).map(ToString::to_string)
892}
893
894fn prefix_upper_bound(prefix: &[u8]) -> Option<Vec<u8>> {
895 let mut upper = prefix.to_vec();
896 for idx in (0..upper.len()).rev() {
897 if upper[idx] != 0xFF {
898 upper[idx] += 1;
899 upper.truncate(idx + 1);
900 return Some(upper);
901 }
902 }
903 None
904}
905
906fn bind_reuseport_listener(addr: SocketAddr) -> Result<std::net::TcpListener> {
907 let domain = if addr.is_ipv4() {
908 Domain::IPV4
909 } else {
910 Domain::IPV6
911 };
912
913 let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))
914 .context("failed creating TCP socket")?;
915 socket
916 .set_reuse_address(true)
917 .context("failed setting SO_REUSEADDR")?;
918
919 #[cfg(any(target_os = "linux", target_os = "android"))]
920 socket
921 .set_reuse_port(true)
922 .context("failed setting SO_REUSEPORT")?;
923
924 socket
925 .bind(&addr.into())
926 .with_context(|| format!("failed binding {addr}"))?;
927 socket.listen(2048).context("failed listen")?;
928 socket
929 .set_nonblocking(true)
930 .context("failed setting non-blocking")?;
931
932 Ok(socket.into())
933}
934
935fn open_target_file(target: &Path, allow_fallback: bool) -> Result<(File, String, PathBuf)> {
936 match open_direct_target(target, false) {
937 Ok(file) => Ok((file, "block".to_string(), target.to_path_buf())),
938 Err(err) => {
939 if !allow_fallback {
940 return Err(err).with_context(|| {
941 format!(
942 "opening {} as module-f block target failed and fallback disabled",
943 target.display()
944 )
945 });
946 }
947
948 let fallback = fallback_path_for(target);
949 if let Some(parent) = fallback.parent() {
950 if !parent.as_os_str().is_empty() {
951 fs::create_dir_all(parent).with_context(|| {
952 format!("failed to create fallback parent {}", parent.display())
953 })?;
954 }
955 }
956
957 let file = open_direct_target(&fallback, true).with_context(|| {
958 format!(
959 "failed opening module-f file fallback {}",
960 fallback.display()
961 )
962 })?;
963
964 Ok((file, "file-fallback".to_string(), fallback))
965 }
966 }
967}
968
969fn open_direct_target(path: &Path, create_file: bool) -> Result<File> {
970 let mut opts = OpenOptions::new();
971 opts.write(true).read(true);
972 if create_file {
973 opts.create(true).truncate(true).mode(0o644);
974 }
975
976 #[cfg(target_os = "linux")]
977 {
978 opts.custom_flags(libc::O_DIRECT | libc::O_DSYNC);
979 }
980
981 opts.open(path)
982 .with_context(|| format!("open failed for {}", path.display()))
983}
984
985fn fallback_path_for(target: &Path) -> PathBuf {
986 if target.starts_with("/dev") {
987 PathBuf::from("/tmp/tracer-bullet-module-f-direct.bin")
988 } else {
989 target.with_extension("direct.bin")
990 }
991}
992
993fn run_group_commit_loop(
994 file: File,
995 submit_rx: mpsc::Receiver<AppendRequest>,
996 group_bytes: usize,
997 flush_interval: Duration,
998) -> Result<()> {
999 let mut logical = Vec::<u8>::with_capacity(group_bytes + group_bytes / 8);
1000 let mut pending = Vec::<PendingRecord>::with_capacity(128);
1001 let mut file_offset = 0_u64;
1002
1003 loop {
1004 match submit_rx.recv_timeout(flush_interval) {
1005 Ok(request) => {
1006 let start = logical.len();
1007 logical.extend_from_slice(&(request.payload.len() as u32).to_le_bytes());
1008 logical.extend_from_slice(&request.original_len.to_le_bytes());
1009 logical.extend_from_slice(&request.crc32c.to_le_bytes());
1010 logical.extend_from_slice(&request.payload);
1011
1012 pending.push(PendingRecord {
1013 record_start: start,
1014 stored_len: request.payload.len() as u32,
1015 original_len: request.original_len,
1016 crc32c: request.crc32c,
1017 response_tx: request.response_tx,
1018 });
1019
1020 if logical.len() >= group_bytes {
1021 flush_group(&file, &mut logical, &mut pending, &mut file_offset)?;
1022 }
1023 }
1024 Err(mpsc::RecvTimeoutError::Timeout) => {
1025 if !logical.is_empty() {
1026 flush_group(&file, &mut logical, &mut pending, &mut file_offset)?;
1027 }
1028 }
1029 Err(mpsc::RecvTimeoutError::Disconnected) => {
1030 if !logical.is_empty() {
1031 flush_group(&file, &mut logical, &mut pending, &mut file_offset)?;
1032 }
1033 return Ok(());
1034 }
1035 }
1036 }
1037}
1038
1039fn flush_group(
1040 file: &File,
1041 logical: &mut Vec<u8>,
1042 pending: &mut Vec<PendingRecord>,
1043 file_offset: &mut u64,
1044) -> Result<()> {
1045 if logical.is_empty() {
1046 return Ok(());
1047 }
1048
1049 let physical_len = align_up(logical.len(), ALIGNMENT);
1050 let mut aligned = vec![0_u8; physical_len + ALIGNMENT];
1051 let align_start = aligned_offset(aligned.as_ptr() as usize, ALIGNMENT);
1052 let slice = &mut aligned[align_start..align_start + physical_len];
1053 slice[..logical.len()].copy_from_slice(logical);
1054 if physical_len > logical.len() {
1055 slice[logical.len()..].fill(0);
1056 }
1057
1058 if (slice.as_ptr() as usize) % ALIGNMENT != 0
1059 || slice.len() % ALIGNMENT != 0
1060 || (*file_offset % ALIGNMENT as u64) != 0
1061 {
1062 anyhow::bail!("unaligned direct-io group flush");
1063 }
1064
1065 write_all_at(file, slice, *file_offset).context("module-f group write failed")?;
1066
1067 for item in pending.drain(..) {
1068 let payload_offset = *file_offset + item.record_start as u64 + RECORD_HEADER_BYTES as u64;
1069 let result = StoredObjectMeta {
1070 payload_offset,
1071 stored_len: item.stored_len,
1072 original_len: item.original_len,
1073 crc32c: item.crc32c,
1074 last_modified: Timestamp::from(SystemTime::now()),
1075 };
1076 let _ = item.response_tx.send(Ok(result));
1077 }
1078
1079 *file_offset += physical_len as u64;
1080 logical.clear();
1081
1082 Ok(())
1083}
1084
1085fn write_all_at(file: &File, mut buf: &[u8], mut offset: u64) -> io::Result<()> {
1086 while !buf.is_empty() {
1087 let written = file.write_at(buf, offset)?;
1088 if written == 0 {
1089 return Err(io::Error::new(
1090 ErrorKind::WriteZero,
1091 "write_at returned zero bytes",
1092 ));
1093 }
1094 buf = &buf[written..];
1095 offset = offset.saturating_add(written as u64);
1096 }
1097 Ok(())
1098}
1099
1100fn align_up(value: usize, align: usize) -> usize {
1101 if value % align == 0 {
1102 value
1103 } else {
1104 value + (align - (value % align))
1105 }
1106}
1107
1108fn aligned_offset(ptr: usize, align: usize) -> usize {
1109 (align - (ptr % align)) % align
1110}
1111
1112#[cfg(test)]
1113mod tests {
1114 use super::*;
1115
1116 #[test]
1117 fn prefix_upper_bound_increments_correctly() {
1118 assert_eq!(prefix_upper_bound(b"abc").unwrap(), b"abd");
1119 }
1120
1121 #[test]
1122 fn alignment_helpers_work() {
1123 assert_eq!(align_up(4096, 4096), 4096);
1124 assert_eq!(align_up(4097, 4096), 8192);
1125 assert_eq!(aligned_offset(0, 4096), 0);
1126 assert_eq!(aligned_offset(1, 4096), 4095);
1127 }
1128
1129 #[test]
1130 fn storage_key_round_trip() {
1131 let full = storage_key("bucket", "path/file");
1132 assert_eq!(strip_storage_prefix("bucket", &full).unwrap(), "path/file");
1133 }
1134}