Skip to main content

nexus_core/
module_f.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::fs::{self, File, OpenOptions};
3use std::io;
4use std::io::ErrorKind;
5use std::net::SocketAddr;
6use std::os::unix::fs::{FileExt, OpenOptionsExt};
7use std::path::{Path, PathBuf};
8use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
9use std::sync::{mpsc, Arc, RwLock};
10use std::thread;
11use std::time::{Duration, Instant, SystemTime};
12
13use anyhow::{Context, Result};
14use arc_swap::ArcSwap;
15use crossbeam_skiplist::SkipMap;
16use fst::{IntoStreamer, Streamer};
17use futures::TryStreamExt;
18use hyper_util::rt::{TokioExecutor, TokioIo};
19use hyper_util::server::conn::auto::Builder as ConnBuilder;
20#[cfg(target_os = "linux")]
21use nix::libc;
22use s3s::auth::SimpleAuth;
23use s3s::dto::*;
24use s3s::service::S3ServiceBuilder;
25use s3s::{s3_error, Body, S3Request, S3Response, S3Result};
26use socket2::{Domain, Protocol, Socket, Type};
27
28use crate::module_d;
29
30const ALIGNMENT: usize = 4096;
31const RECORD_HEADER_BYTES: usize = 12;
32
33#[derive(Debug, Clone)]
34pub struct ModuleFConfig {
35    pub listen_addr: String,
36    pub workers: usize,
37    pub target_path: PathBuf,
38    pub group_bytes: usize,
39    pub compaction_threshold: usize,
40    pub flush_interval_ms: u64,
41    pub default_bucket: String,
42    pub access_key: String,
43    pub secret_key: String,
44    pub allow_file_fallback: bool,
45    pub require_io_uring: bool,
46}
47
48#[derive(Debug, Clone)]
49struct StoredObjectMeta {
50    payload_offset: u64,
51    stored_len: u32,
52    original_len: u32,
53    crc32c: u32,
54    last_modified: Timestamp,
55}
56
57#[derive(Debug)]
58struct PendingRecord {
59    record_start: usize,
60    stored_len: u32,
61    original_len: u32,
62    crc32c: u32,
63    response_tx: tokio::sync::oneshot::Sender<Result<StoredObjectMeta>>,
64}
65
66#[derive(Debug)]
67struct AppendRequest {
68    payload: Vec<u8>,
69    original_len: u32,
70    crc32c: u32,
71    response_tx: tokio::sync::oneshot::Sender<Result<StoredObjectMeta>>,
72}
73
74#[derive(Clone)]
75struct GroupCommitAppender {
76    submit_tx: mpsc::Sender<AppendRequest>,
77    read_path: PathBuf,
78    mode: String,
79}
80
81impl GroupCommitAppender {
82    fn start(config: &ModuleFConfig) -> Result<Self> {
83        if config.require_io_uring && !module_d::io_uring_available() {
84            anyhow::bail!("module-f requires io_uring, but probe failed in current environment");
85        }
86
87        let (file, mode, resolved_path) =
88            open_target_file(&config.target_path, config.allow_file_fallback).with_context(
89                || {
90                    format!(
91                        "failed to open module-f target {}",
92                        config.target_path.display()
93                    )
94                },
95            )?;
96
97        let (submit_tx, submit_rx) = mpsc::channel::<AppendRequest>();
98        let group_bytes = config.group_bytes;
99        let flush_interval = Duration::from_millis(config.flush_interval_ms.max(1));
100
101        thread::Builder::new()
102            .name("module-f-group-commit".to_string())
103            .spawn(move || {
104                if let Err(err) =
105                    run_group_commit_loop(file, submit_rx, group_bytes, flush_interval)
106                {
107                    eprintln!("module-f appender thread exiting with error: {err:#}");
108                }
109            })
110            .context("failed spawning module-f group-commit thread")?;
111
112        Ok(Self {
113            submit_tx,
114            read_path: resolved_path,
115            mode,
116        })
117    }
118
119    async fn append_payload(
120        &self,
121        payload: Vec<u8>,
122        original_len: u32,
123        crc32c: u32,
124    ) -> Result<StoredObjectMeta> {
125        let (response_tx, response_rx) = tokio::sync::oneshot::channel();
126        self.submit_tx
127            .send(AppendRequest {
128                payload,
129                original_len,
130                crc32c,
131                response_tx,
132            })
133            .map_err(|_| anyhow::anyhow!("module-f appender queue closed"))?;
134
135        response_rx
136            .await
137            .context("module-f appender response channel dropped")?
138    }
139
140    fn read_payload(&self, offset: u64, len: usize) -> Result<Vec<u8>> {
141        let file = File::open(&self.read_path)
142            .with_context(|| format!("failed opening read path {}", self.read_path.display()))?;
143
144        let mut buffer = vec![0_u8; len];
145        let mut filled = 0usize;
146        while filled < len {
147            let n = file
148                .read_at(&mut buffer[filled..], offset + filled as u64)
149                .context("module-f read_at failed")?;
150            if n == 0 {
151                anyhow::bail!(
152                    "unexpected EOF while reading payload at offset {} (wanted {}, got {})",
153                    offset,
154                    len,
155                    filled
156                );
157            }
158            filled += n;
159        }
160
161        Ok(buffer)
162    }
163}
164
165#[derive(Debug)]
166struct ImmutableIndex {
167    entries: BTreeMap<String, StoredObjectMeta>,
168    fst: fst::Map<Vec<u8>>,
169}
170
171impl ImmutableIndex {
172    fn empty() -> Self {
173        let builder = fst::MapBuilder::memory();
174        let bytes = builder.into_inner().unwrap_or_default();
175        let fst = fst::Map::new(bytes).unwrap();
176        Self {
177            entries: BTreeMap::new(),
178            fst,
179        }
180    }
181}
182
183#[derive(Debug)]
184struct CompactionJob {
185    delta: Vec<(String, Option<StoredObjectMeta>)>,
186}
187
188#[derive(Clone)]
189struct LsmIndex {
190    l0: Arc<SkipMap<String, Option<StoredObjectMeta>>>,
191    l1: Arc<ArcSwap<ImmutableIndex>>,
192    compaction_threshold: usize,
193    compaction_in_progress: Arc<AtomicBool>,
194    compaction_count: Arc<AtomicU64>,
195    compaction_total_ns: Arc<AtomicU64>,
196    compaction_tx: mpsc::Sender<CompactionJob>,
197}
198
199impl LsmIndex {
200    fn new(compaction_threshold: usize) -> Result<Self> {
201        let l0 = Arc::new(SkipMap::<String, Option<StoredObjectMeta>>::new());
202        let l1 = Arc::new(ArcSwap::from_pointee(ImmutableIndex::empty()));
203        let compaction_in_progress = Arc::new(AtomicBool::new(false));
204        let compaction_count = Arc::new(AtomicU64::new(0));
205        let compaction_total_ns = Arc::new(AtomicU64::new(0));
206        let (compaction_tx, compaction_rx) = mpsc::channel::<CompactionJob>();
207
208        let l1_for_worker = l1.clone();
209        let compaction_in_progress_worker = compaction_in_progress.clone();
210        let compaction_count_worker = compaction_count.clone();
211        let compaction_total_ns_worker = compaction_total_ns.clone();
212
213        thread::Builder::new()
214            .name("module-f-compaction".to_string())
215            .spawn(move || {
216                for job in compaction_rx {
217                    let started = Instant::now();
218                    match build_compacted_index(l1_for_worker.load_full().as_ref(), job.delta) {
219                        Ok(new_index) => {
220                            l1_for_worker.store(Arc::new(new_index));
221                            compaction_count_worker.fetch_add(1, Ordering::Relaxed);
222                            compaction_total_ns_worker
223                                .fetch_add(started.elapsed().as_nanos() as u64, Ordering::Relaxed);
224                        }
225                        Err(err) => {
226                            eprintln!("module-f compaction error: {err:#}");
227                        }
228                    }
229                    compaction_in_progress_worker.store(false, Ordering::Release);
230                }
231            })
232            .context("failed spawning module-f compaction thread")?;
233
234        Ok(Self {
235            l0,
236            l1,
237            compaction_threshold,
238            compaction_in_progress,
239            compaction_count,
240            compaction_total_ns,
241            compaction_tx,
242        })
243    }
244
245    fn upsert(&self, key: String, meta: StoredObjectMeta) {
246        self.l0.insert(key, Some(meta));
247        self.maybe_schedule_compaction();
248    }
249
250    fn delete(&self, key: String) {
251        self.l0.insert(key, None);
252        self.maybe_schedule_compaction();
253    }
254
255    fn get(&self, key: &str) -> Option<StoredObjectMeta> {
256        if let Some(entry) = self.l0.get(key) {
257            return entry.value().clone();
258        }
259        self.l1.load().entries.get(key).cloned()
260    }
261
262    fn list_prefix(&self, prefix: &str, max_keys: usize) -> Vec<(String, StoredObjectMeta)> {
263        let mut merged = BTreeMap::<String, StoredObjectMeta>::new();
264        let l1 = self.l1.load_full();
265
266        let mut range = l1.fst.range().ge(prefix.as_bytes());
267        if let Some(upper) = prefix_upper_bound(prefix.as_bytes()) {
268            range = range.lt(upper.as_slice());
269        }
270        let mut stream = range.into_stream();
271        while let Some((key_bytes, _)) = stream.next() {
272            let key = String::from_utf8_lossy(key_bytes).to_string();
273            if let Some(meta) = l1.entries.get(&key) {
274                merged.insert(key, meta.clone());
275            }
276        }
277
278        for entry in self.l0.iter() {
279            let key = entry.key();
280            if !key.starts_with(prefix) {
281                continue;
282            }
283            match entry.value().clone() {
284                Some(meta) => {
285                    merged.insert(key.clone(), meta);
286                }
287                None => {
288                    merged.remove(key);
289                }
290            }
291        }
292
293        merged.into_iter().take(max_keys).collect()
294    }
295
296    fn compaction_metrics(&self) -> (u64, f64) {
297        let count = self.compaction_count.load(Ordering::Relaxed);
298        if count == 0 {
299            return (0, 0.0);
300        }
301        let total_ns = self.compaction_total_ns.load(Ordering::Relaxed);
302        let avg_ms = (total_ns as f64 / count as f64) / 1_000_000.0;
303        (count, avg_ms)
304    }
305
306    fn maybe_schedule_compaction(&self) {
307        if self.l0.len() < self.compaction_threshold {
308            return;
309        }
310        if self
311            .compaction_in_progress
312            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
313            .is_err()
314        {
315            return;
316        }
317
318        let mut delta = Vec::<(String, Option<StoredObjectMeta>)>::new();
319        for entry in self.l0.iter() {
320            delta.push((entry.key().clone(), entry.value().clone()));
321        }
322
323        if delta.is_empty() {
324            self.compaction_in_progress.store(false, Ordering::Release);
325            return;
326        }
327
328        for (key, _) in &delta {
329            let _ = self.l0.remove(key);
330        }
331
332        if self.compaction_tx.send(CompactionJob { delta }).is_err() {
333            self.compaction_in_progress.store(false, Ordering::Release);
334        }
335    }
336}
337
338fn build_compacted_index(
339    previous: &ImmutableIndex,
340    delta: Vec<(String, Option<StoredObjectMeta>)>,
341) -> Result<ImmutableIndex> {
342    let mut merged = previous.entries.clone();
343    for (key, value) in delta {
344        match value {
345            Some(meta) => {
346                merged.insert(key, meta);
347            }
348            None => {
349                merged.remove(&key);
350            }
351        }
352    }
353
354    let mut builder = fst::MapBuilder::memory();
355    for (ordinal, key) in merged.keys().enumerate() {
356        builder
357            .insert(key, ordinal as u64)
358            .with_context(|| format!("failed inserting key into compacted fst: {key}"))?;
359    }
360    let bytes = builder
361        .into_inner()
362        .context("failed finalizing compacted fst bytes")?;
363    let fst = fst::Map::new(bytes).context("failed loading compacted fst map")?;
364
365    Ok(ImmutableIndex {
366        entries: merged,
367        fst,
368    })
369}
370
371#[derive(Clone)]
372struct GatewayState {
373    appender: GroupCommitAppender,
374    index: LsmIndex,
375    buckets: Arc<RwLock<BTreeSet<String>>>,
376    default_bucket: String,
377    request_count: Arc<AtomicU64>,
378    puts: Arc<AtomicU64>,
379    gets: Arc<AtomicU64>,
380    lists: Arc<AtomicU64>,
381    deletes: Arc<AtomicU64>,
382    last_compaction_count: Arc<AtomicU64>,
383    last_compaction_avg_ms_x1000: Arc<AtomicU64>,
384}
385
386#[derive(Clone)]
387struct GatewayS3 {
388    state: GatewayState,
389}
390
391#[async_trait::async_trait]
392impl s3s::S3 for GatewayS3 {
393    async fn create_bucket(
394        &self,
395        req: S3Request<CreateBucketInput>,
396    ) -> S3Result<S3Response<CreateBucketOutput>> {
397        let bucket = req.input.bucket;
398        let mut buckets = self
399            .state
400            .buckets
401            .write()
402            .map_err(|_| s3_error!(InternalError))?;
403        buckets.insert(bucket);
404        Ok(S3Response::new(CreateBucketOutput::default()))
405    }
406
407    async fn head_bucket(
408        &self,
409        req: S3Request<HeadBucketInput>,
410    ) -> S3Result<S3Response<HeadBucketOutput>> {
411        let bucket = req.input.bucket;
412        let buckets = self
413            .state
414            .buckets
415            .read()
416            .map_err(|_| s3_error!(InternalError))?;
417        if !buckets.contains(&bucket) {
418            return Err(s3_error!(NoSuchBucket));
419        }
420        Ok(S3Response::new(HeadBucketOutput::default()))
421    }
422
423    async fn list_buckets(
424        &self,
425        _req: S3Request<ListBucketsInput>,
426    ) -> S3Result<S3Response<ListBucketsOutput>> {
427        let buckets = self
428            .state
429            .buckets
430            .read()
431            .map_err(|_| s3_error!(InternalError))?;
432        let mut items = Vec::with_capacity(buckets.len());
433        for bucket in buckets.iter() {
434            items.push(Bucket {
435                name: Some(bucket.clone()),
436                ..Default::default()
437            });
438        }
439
440        Ok(S3Response::new(ListBucketsOutput {
441            buckets: Some(items),
442            ..Default::default()
443        }))
444    }
445
446    async fn get_bucket_location(
447        &self,
448        req: S3Request<GetBucketLocationInput>,
449    ) -> S3Result<S3Response<GetBucketLocationOutput>> {
450        self.state.request_count.fetch_add(1, Ordering::Relaxed);
451        ensure_bucket_exists(&self.state, &req.input.bucket)?;
452        Ok(S3Response::new(GetBucketLocationOutput::default()))
453    }
454
455    async fn put_object(
456        &self,
457        req: S3Request<PutObjectInput>,
458    ) -> S3Result<S3Response<PutObjectOutput>> {
459        self.state.request_count.fetch_add(1, Ordering::Relaxed);
460        self.state.puts.fetch_add(1, Ordering::Relaxed);
461
462        let input = req.input;
463        let bucket = input.bucket;
464        let key = input.key;
465
466        ensure_bucket_exists(&self.state, &bucket)?;
467
468        let Some(body) = input.body else {
469            return Err(s3_error!(IncompleteBody));
470        };
471
472        let raw = read_streaming_blob(body)
473            .await
474            .map_err(|_| s3_error!(InvalidRequest, "failed reading request body"))?;
475        let original_len = u32::try_from(raw.len())
476            .map_err(|_| s3_error!(InvalidRequest, "object body too large for tracer bullet"))?;
477        let crc = crc32c::crc32c(&raw);
478        let compressed = zstd::bulk::compress(&raw, 1)
479            .map_err(|_| s3_error!(InternalError, "zstd compression failed"))?;
480
481        let stored = self
482            .state
483            .appender
484            .append_payload(compressed, original_len, crc)
485            .await
486            .map_err(|_| s3_error!(InternalError, "append path failed"))?;
487
488        self.state
489            .index
490            .upsert(storage_key(&bucket, &key), stored.clone());
491
492        let (compactions, avg_ms) = self.state.index.compaction_metrics();
493        self.state
494            .last_compaction_count
495            .store(compactions, Ordering::Relaxed);
496        self.state
497            .last_compaction_avg_ms_x1000
498            .store((avg_ms * 1000.0) as u64, Ordering::Relaxed);
499
500        Ok(S3Response::new(PutObjectOutput {
501            e_tag: Some(ETag::Strong(format!("{:08x}", stored.crc32c))),
502            ..Default::default()
503        }))
504    }
505
506    async fn get_object(
507        &self,
508        req: S3Request<GetObjectInput>,
509    ) -> S3Result<S3Response<GetObjectOutput>> {
510        self.state.request_count.fetch_add(1, Ordering::Relaxed);
511        self.state.gets.fetch_add(1, Ordering::Relaxed);
512
513        let input = req.input;
514        let bucket = input.bucket;
515        let key = input.key;
516
517        let Some(meta) = self.state.index.get(&storage_key(&bucket, &key)) else {
518            return Err(s3_error!(NoSuchKey));
519        };
520
521        let payload = self
522            .state
523            .appender
524            .read_payload(meta.payload_offset, meta.stored_len as usize)
525            .map_err(|_| s3_error!(InternalError, "read path failed"))?;
526
527        let mut restored = zstd::bulk::decompress(&payload, meta.original_len as usize)
528            .map_err(|_| s3_error!(InternalError, "zstd decompression failed"))?;
529
530        let mut content_range = None;
531        if let Some(range) = input.range {
532            let checked = range
533                .check(restored.len() as u64)
534                .map_err(|_| s3_error!(InvalidRange))?;
535            let start = checked.start as usize;
536            let end = checked.end as usize;
537            restored = restored[start..end].to_vec();
538            content_range = Some(format!(
539                "bytes {}-{}/{}",
540                checked.start,
541                checked.end.saturating_sub(1),
542                meta.original_len
543            ));
544        }
545
546        let content_len_i64 =
547            i64::try_from(restored.len()).map_err(|_| s3_error!(InternalError))?;
548
549        Ok(S3Response::new(GetObjectOutput {
550            body: Some(StreamingBlob::from(Body::from(restored))),
551            content_length: Some(content_len_i64),
552            content_range,
553            e_tag: Some(ETag::Strong(format!("{:08x}", meta.crc32c))),
554            last_modified: Some(meta.last_modified.clone()),
555            ..Default::default()
556        }))
557    }
558
559    async fn list_objects_v2(
560        &self,
561        req: S3Request<ListObjectsV2Input>,
562    ) -> S3Result<S3Response<ListObjectsV2Output>> {
563        self.state.request_count.fetch_add(1, Ordering::Relaxed);
564        self.state.lists.fetch_add(1, Ordering::Relaxed);
565
566        let input = req.input;
567        ensure_bucket_exists(&self.state, &input.bucket)?;
568
569        let prefix = input.prefix.clone().unwrap_or_default();
570        let max_keys_i32 = input.max_keys.unwrap_or(1000).max(1);
571        let max_keys = usize::try_from(max_keys_i32).map_err(|_| s3_error!(InvalidArgument))?;
572        let storage_prefix = storage_key_prefix(&input.bucket, &prefix);
573        let mut rows = self
574            .state
575            .index
576            .list_prefix(&storage_prefix, max_keys.saturating_add(1));
577
578        if let Some(token) = input
579            .continuation_token
580            .clone()
581            .or(input.start_after.clone())
582        {
583            let token_key = storage_key(&input.bucket, &token);
584            rows.retain(|(full_key, _)| full_key > &token_key);
585        }
586
587        let is_truncated = rows.len() > max_keys;
588        rows.truncate(max_keys);
589
590        let mut contents = Vec::with_capacity(rows.len());
591        for (full_key, meta) in rows {
592            let key = strip_storage_prefix(&input.bucket, &full_key)
593                .ok_or_else(|| s3_error!(InternalError, "invalid internal storage key"))?;
594            contents.push(Object {
595                key: Some(key),
596                size: Some(i64::from(meta.original_len)),
597                e_tag: Some(ETag::Strong(format!("{:08x}", meta.crc32c))),
598                last_modified: Some(meta.last_modified.clone()),
599                ..Default::default()
600            });
601        }
602
603        let key_count = i32::try_from(contents.len()).map_err(|_| s3_error!(InternalError))?;
604
605        Ok(S3Response::new(ListObjectsV2Output {
606            name: Some(input.bucket),
607            prefix: input.prefix,
608            key_count: Some(key_count),
609            max_keys: Some(max_keys_i32),
610            is_truncated: Some(is_truncated),
611            contents: (!contents.is_empty()).then_some(contents),
612            ..Default::default()
613        }))
614    }
615
616    async fn head_object(
617        &self,
618        req: S3Request<HeadObjectInput>,
619    ) -> S3Result<S3Response<HeadObjectOutput>> {
620        self.state.request_count.fetch_add(1, Ordering::Relaxed);
621        self.state.gets.fetch_add(1, Ordering::Relaxed);
622        let input = req.input;
623        let Some(meta) = self
624            .state
625            .index
626            .get(&storage_key(&input.bucket, &input.key))
627        else {
628            return Err(s3_error!(NoSuchKey));
629        };
630
631        Ok(S3Response::new(HeadObjectOutput {
632            content_length: Some(i64::from(meta.original_len)),
633            e_tag: Some(ETag::Strong(format!("{:08x}", meta.crc32c))),
634            last_modified: Some(meta.last_modified.clone()),
635            ..Default::default()
636        }))
637    }
638
639    async fn delete_object(
640        &self,
641        req: S3Request<DeleteObjectInput>,
642    ) -> S3Result<S3Response<DeleteObjectOutput>> {
643        self.state.request_count.fetch_add(1, Ordering::Relaxed);
644        self.state.deletes.fetch_add(1, Ordering::Relaxed);
645        let input = req.input;
646        self.state
647            .index
648            .delete(storage_key(&input.bucket, &input.key));
649        Ok(S3Response::new(DeleteObjectOutput::default()))
650    }
651
652    async fn delete_objects(
653        &self,
654        req: S3Request<DeleteObjectsInput>,
655    ) -> S3Result<S3Response<DeleteObjectsOutput>> {
656        self.state.request_count.fetch_add(1, Ordering::Relaxed);
657
658        let input = req.input;
659        ensure_bucket_exists(&self.state, &input.bucket)?;
660
661        let mut deleted = Vec::<DeletedObject>::new();
662        for object in input.delete.objects {
663            self.state.deletes.fetch_add(1, Ordering::Relaxed);
664            self.state
665                .index
666                .delete(storage_key(&input.bucket, &object.key));
667            deleted.push(DeletedObject {
668                key: Some(object.key),
669                ..Default::default()
670            });
671        }
672
673        Ok(S3Response::new(DeleteObjectsOutput {
674            deleted: Some(deleted),
675            ..Default::default()
676        }))
677    }
678}
679
680pub fn serve(config: ModuleFConfig) -> Result<()> {
681    validate_config(&config)?;
682
683    let listen_addr: SocketAddr = config
684        .listen_addr
685        .parse()
686        .with_context(|| format!("invalid listen_addr {}", config.listen_addr))?;
687
688    let appender = GroupCommitAppender::start(&config)?;
689    let index = LsmIndex::new(config.compaction_threshold)?;
690    let buckets = Arc::new(RwLock::new(BTreeSet::<String>::new()));
691
692    {
693        let mut guard = buckets
694            .write()
695            .map_err(|_| anyhow::anyhow!("bucket lock poisoned"))?;
696        guard.insert(config.default_bucket.clone());
697    }
698
699    let state = GatewayState {
700        appender: appender.clone(),
701        index,
702        buckets,
703        default_bucket: config.default_bucket.clone(),
704        request_count: Arc::new(AtomicU64::new(0)),
705        puts: Arc::new(AtomicU64::new(0)),
706        gets: Arc::new(AtomicU64::new(0)),
707        lists: Arc::new(AtomicU64::new(0)),
708        deletes: Arc::new(AtomicU64::new(0)),
709        last_compaction_count: Arc::new(AtomicU64::new(0)),
710        last_compaction_avg_ms_x1000: Arc::new(AtomicU64::new(0)),
711    };
712
713    let core_ids = core_affinity::get_core_ids().unwrap_or_default();
714    let worker_count = config.workers;
715    let access_key = config.access_key.clone();
716    let secret_key = config.secret_key.clone();
717    let accept_counts = Arc::new(
718        (0..worker_count)
719            .map(|_| AtomicU64::new(0))
720            .collect::<Vec<_>>(),
721    );
722
723    eprintln!(
724        "module-f starting: addr={} workers={} mode={} target={} default_bucket={} require_io_uring={}",
725        listen_addr,
726        worker_count,
727        appender.mode,
728        appender.read_path.display(),
729        state.default_bucket,
730        config.require_io_uring
731    );
732
733    let mut worker_handles = Vec::with_capacity(worker_count);
734    for worker_idx in 0..worker_count {
735        let state = state.clone();
736        let accept_counts = accept_counts.clone();
737        let access_key = access_key.clone();
738        let secret_key = secret_key.clone();
739        let worker_core = if core_ids.is_empty() {
740            None
741        } else {
742            Some(core_ids[worker_idx % core_ids.len()])
743        };
744
745        let handle = thread::Builder::new()
746            .name(format!("module-f-worker-{worker_idx}"))
747            .spawn(move || -> Result<()> {
748                if let Some(core_id) = worker_core {
749                    let _ = core_affinity::set_for_current(core_id);
750                }
751
752                let std_listener = bind_reuseport_listener(listen_addr)
753                    .with_context(|| format!("worker {} failed to bind listener", worker_idx))?;
754                let rt = tokio::runtime::Builder::new_current_thread()
755                    .enable_io()
756                    .enable_time()
757                    .build()
758                    .context("failed creating current-thread runtime")?;
759
760                rt.block_on(async move {
761                    let local = tokio::task::LocalSet::new();
762                    local
763                        .run_until(async move {
764                            let listener = tokio::net::TcpListener::from_std(std_listener)
765                                .context("failed to adapt std listener")?;
766
767                            let gateway = GatewayS3 { state };
768                            let s3_service = {
769                                let mut builder = S3ServiceBuilder::new(gateway);
770                                builder.set_auth(SimpleAuth::from_single(
771                                    access_key.as_str(),
772                                    secret_key.as_str(),
773                                ));
774                                builder.build()
775                            };
776
777                            loop {
778                                let (stream, _remote) = listener
779                                    .accept()
780                                    .await
781                                    .context("accept failed in module-f worker")?;
782                                accept_counts[worker_idx].fetch_add(1, Ordering::Relaxed);
783                                let service = s3_service.clone();
784
785                                tokio::task::spawn_local(async move {
786                                    let io = TokioIo::new(stream);
787                                    let conn = ConnBuilder::new(TokioExecutor::new())
788                                        .serve_connection(io, service)
789                                        .into_owned();
790                                    if let Err(err) = conn.await {
791                                        eprintln!("module-f connection error: {err}");
792                                    }
793                                });
794                            }
795                        })
796                        .await
797                })
798            })
799            .with_context(|| format!("failed spawning module-f worker {worker_idx}"))?;
800
801        worker_handles.push(handle);
802    }
803
804    let stats_state = state.clone();
805    let stats_accept_counts = accept_counts.clone();
806    let _stats_handle = thread::Builder::new()
807        .name("module-f-stats".to_string())
808        .spawn(move || loop {
809            thread::sleep(Duration::from_secs(10));
810            let mut accepts = Vec::with_capacity(stats_accept_counts.len());
811            for (idx, counter) in stats_accept_counts.iter().enumerate() {
812                accepts.push(format!("w{idx}={}", counter.load(Ordering::Relaxed)));
813            }
814            let (compactions, avg_ms) = stats_state.index.compaction_metrics();
815            eprintln!(
816                "module-f stats requests={} puts={} gets={} lists={} deletes={} compactions={} compaction_avg_ms={:.3} accepts=[{}]",
817                stats_state.request_count.load(Ordering::Relaxed),
818                stats_state.puts.load(Ordering::Relaxed),
819                stats_state.gets.load(Ordering::Relaxed),
820                stats_state.lists.load(Ordering::Relaxed),
821                stats_state.deletes.load(Ordering::Relaxed),
822                compactions,
823                avg_ms,
824                accepts.join(",")
825            );
826        })
827        .context("failed spawning module-f stats thread")?;
828
829    for handle in worker_handles {
830        let worker_result = handle
831            .join()
832            .map_err(|_| anyhow::anyhow!("module-f worker thread panicked"))?;
833        worker_result?;
834    }
835
836    Ok(())
837}
838
839fn validate_config(config: &ModuleFConfig) -> Result<()> {
840    if config.workers == 0 {
841        anyhow::bail!("workers must be > 0");
842    }
843    if config.group_bytes == 0 || config.group_bytes % ALIGNMENT != 0 {
844        anyhow::bail!("group_bytes must be > 0 and aligned to {}", ALIGNMENT);
845    }
846    if config.compaction_threshold == 0 {
847        anyhow::bail!("compaction_threshold must be > 0");
848    }
849    if config.flush_interval_ms == 0 {
850        anyhow::bail!("flush_interval_ms must be > 0");
851    }
852    if config.default_bucket.trim().is_empty() {
853        anyhow::bail!("default bucket must be non-empty");
854    }
855    if config.access_key.trim().is_empty() || config.secret_key.trim().is_empty() {
856        anyhow::bail!("access_key and secret_key must be non-empty");
857    }
858    Ok(())
859}
860
861fn ensure_bucket_exists(state: &GatewayState, bucket: &str) -> S3Result<()> {
862    let buckets = state.buckets.read().map_err(|_| s3_error!(InternalError))?;
863    if !buckets.contains(bucket) {
864        return Err(s3_error!(NoSuchBucket));
865    }
866    Ok(())
867}
868
869async fn read_streaming_blob(mut body: StreamingBlob) -> Result<Vec<u8>> {
870    let mut out = Vec::new();
871    while let Some(chunk) = body
872        .try_next()
873        .await
874        .map_err(|err| anyhow::anyhow!("failed reading streaming blob: {err}"))?
875    {
876        out.extend_from_slice(chunk.as_ref());
877    }
878    Ok(out)
879}
880
881fn storage_key(bucket: &str, key: &str) -> String {
882    format!("{bucket}/{key}")
883}
884
885fn storage_key_prefix(bucket: &str, prefix: &str) -> String {
886    format!("{bucket}/{prefix}")
887}
888
889fn strip_storage_prefix(bucket: &str, full_key: &str) -> Option<String> {
890    let prefix = format!("{bucket}/");
891    full_key.strip_prefix(&prefix).map(ToString::to_string)
892}
893
894fn prefix_upper_bound(prefix: &[u8]) -> Option<Vec<u8>> {
895    let mut upper = prefix.to_vec();
896    for idx in (0..upper.len()).rev() {
897        if upper[idx] != 0xFF {
898            upper[idx] += 1;
899            upper.truncate(idx + 1);
900            return Some(upper);
901        }
902    }
903    None
904}
905
906fn bind_reuseport_listener(addr: SocketAddr) -> Result<std::net::TcpListener> {
907    let domain = if addr.is_ipv4() {
908        Domain::IPV4
909    } else {
910        Domain::IPV6
911    };
912
913    let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))
914        .context("failed creating TCP socket")?;
915    socket
916        .set_reuse_address(true)
917        .context("failed setting SO_REUSEADDR")?;
918
919    #[cfg(any(target_os = "linux", target_os = "android"))]
920    socket
921        .set_reuse_port(true)
922        .context("failed setting SO_REUSEPORT")?;
923
924    socket
925        .bind(&addr.into())
926        .with_context(|| format!("failed binding {addr}"))?;
927    socket.listen(2048).context("failed listen")?;
928    socket
929        .set_nonblocking(true)
930        .context("failed setting non-blocking")?;
931
932    Ok(socket.into())
933}
934
935fn open_target_file(target: &Path, allow_fallback: bool) -> Result<(File, String, PathBuf)> {
936    match open_direct_target(target, false) {
937        Ok(file) => Ok((file, "block".to_string(), target.to_path_buf())),
938        Err(err) => {
939            if !allow_fallback {
940                return Err(err).with_context(|| {
941                    format!(
942                        "opening {} as module-f block target failed and fallback disabled",
943                        target.display()
944                    )
945                });
946            }
947
948            let fallback = fallback_path_for(target);
949            if let Some(parent) = fallback.parent() {
950                if !parent.as_os_str().is_empty() {
951                    fs::create_dir_all(parent).with_context(|| {
952                        format!("failed to create fallback parent {}", parent.display())
953                    })?;
954                }
955            }
956
957            let file = open_direct_target(&fallback, true).with_context(|| {
958                format!(
959                    "failed opening module-f file fallback {}",
960                    fallback.display()
961                )
962            })?;
963
964            Ok((file, "file-fallback".to_string(), fallback))
965        }
966    }
967}
968
969fn open_direct_target(path: &Path, create_file: bool) -> Result<File> {
970    let mut opts = OpenOptions::new();
971    opts.write(true).read(true);
972    if create_file {
973        opts.create(true).truncate(true).mode(0o644);
974    }
975
976    #[cfg(target_os = "linux")]
977    {
978        opts.custom_flags(libc::O_DIRECT | libc::O_DSYNC);
979    }
980
981    opts.open(path)
982        .with_context(|| format!("open failed for {}", path.display()))
983}
984
985fn fallback_path_for(target: &Path) -> PathBuf {
986    if target.starts_with("/dev") {
987        PathBuf::from("/tmp/tracer-bullet-module-f-direct.bin")
988    } else {
989        target.with_extension("direct.bin")
990    }
991}
992
993fn run_group_commit_loop(
994    file: File,
995    submit_rx: mpsc::Receiver<AppendRequest>,
996    group_bytes: usize,
997    flush_interval: Duration,
998) -> Result<()> {
999    let mut logical = Vec::<u8>::with_capacity(group_bytes + group_bytes / 8);
1000    let mut pending = Vec::<PendingRecord>::with_capacity(128);
1001    let mut file_offset = 0_u64;
1002
1003    loop {
1004        match submit_rx.recv_timeout(flush_interval) {
1005            Ok(request) => {
1006                let start = logical.len();
1007                logical.extend_from_slice(&(request.payload.len() as u32).to_le_bytes());
1008                logical.extend_from_slice(&request.original_len.to_le_bytes());
1009                logical.extend_from_slice(&request.crc32c.to_le_bytes());
1010                logical.extend_from_slice(&request.payload);
1011
1012                pending.push(PendingRecord {
1013                    record_start: start,
1014                    stored_len: request.payload.len() as u32,
1015                    original_len: request.original_len,
1016                    crc32c: request.crc32c,
1017                    response_tx: request.response_tx,
1018                });
1019
1020                if logical.len() >= group_bytes {
1021                    flush_group(&file, &mut logical, &mut pending, &mut file_offset)?;
1022                }
1023            }
1024            Err(mpsc::RecvTimeoutError::Timeout) => {
1025                if !logical.is_empty() {
1026                    flush_group(&file, &mut logical, &mut pending, &mut file_offset)?;
1027                }
1028            }
1029            Err(mpsc::RecvTimeoutError::Disconnected) => {
1030                if !logical.is_empty() {
1031                    flush_group(&file, &mut logical, &mut pending, &mut file_offset)?;
1032                }
1033                return Ok(());
1034            }
1035        }
1036    }
1037}
1038
1039fn flush_group(
1040    file: &File,
1041    logical: &mut Vec<u8>,
1042    pending: &mut Vec<PendingRecord>,
1043    file_offset: &mut u64,
1044) -> Result<()> {
1045    if logical.is_empty() {
1046        return Ok(());
1047    }
1048
1049    let physical_len = align_up(logical.len(), ALIGNMENT);
1050    let mut aligned = vec![0_u8; physical_len + ALIGNMENT];
1051    let align_start = aligned_offset(aligned.as_ptr() as usize, ALIGNMENT);
1052    let slice = &mut aligned[align_start..align_start + physical_len];
1053    slice[..logical.len()].copy_from_slice(logical);
1054    if physical_len > logical.len() {
1055        slice[logical.len()..].fill(0);
1056    }
1057
1058    if (slice.as_ptr() as usize) % ALIGNMENT != 0
1059        || slice.len() % ALIGNMENT != 0
1060        || (*file_offset % ALIGNMENT as u64) != 0
1061    {
1062        anyhow::bail!("unaligned direct-io group flush");
1063    }
1064
1065    write_all_at(file, slice, *file_offset).context("module-f group write failed")?;
1066
1067    for item in pending.drain(..) {
1068        let payload_offset = *file_offset + item.record_start as u64 + RECORD_HEADER_BYTES as u64;
1069        let result = StoredObjectMeta {
1070            payload_offset,
1071            stored_len: item.stored_len,
1072            original_len: item.original_len,
1073            crc32c: item.crc32c,
1074            last_modified: Timestamp::from(SystemTime::now()),
1075        };
1076        let _ = item.response_tx.send(Ok(result));
1077    }
1078
1079    *file_offset += physical_len as u64;
1080    logical.clear();
1081
1082    Ok(())
1083}
1084
1085fn write_all_at(file: &File, mut buf: &[u8], mut offset: u64) -> io::Result<()> {
1086    while !buf.is_empty() {
1087        let written = file.write_at(buf, offset)?;
1088        if written == 0 {
1089            return Err(io::Error::new(
1090                ErrorKind::WriteZero,
1091                "write_at returned zero bytes",
1092            ));
1093        }
1094        buf = &buf[written..];
1095        offset = offset.saturating_add(written as u64);
1096    }
1097    Ok(())
1098}
1099
1100fn align_up(value: usize, align: usize) -> usize {
1101    if value % align == 0 {
1102        value
1103    } else {
1104        value + (align - (value % align))
1105    }
1106}
1107
1108fn aligned_offset(ptr: usize, align: usize) -> usize {
1109    (align - (ptr % align)) % align
1110}
1111
1112#[cfg(test)]
1113mod tests {
1114    use super::*;
1115
1116    #[test]
1117    fn prefix_upper_bound_increments_correctly() {
1118        assert_eq!(prefix_upper_bound(b"abc").unwrap(), b"abd");
1119    }
1120
1121    #[test]
1122    fn alignment_helpers_work() {
1123        assert_eq!(align_up(4096, 4096), 4096);
1124        assert_eq!(align_up(4097, 4096), 8192);
1125        assert_eq!(aligned_offset(0, 4096), 0);
1126        assert_eq!(aligned_offset(1, 4096), 4095);
1127    }
1128
1129    #[test]
1130    fn storage_key_round_trip() {
1131        let full = storage_key("bucket", "path/file");
1132        assert_eq!(strip_storage_prefix("bucket", &full).unwrap(), "path/file");
1133    }
1134}