Skip to main content

hyperi_rustlib/tiered_sink/
tiered.rs

1// Project:   hyperi-rustlib
2// File:      src/tiered_sink/tiered.rs
3// Purpose:   TieredSink implementation
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! TieredSink implementation.
10
11use crate::tiered_sink::{
12    CircuitBreaker, CircuitState, CompressionCodec, OrderingMode, Result, Sink, SinkError,
13    TieredSinkConfig, TieredSinkError, drainer,
14};
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
17use tokio::sync::{Mutex, Notify};
18use tokio::task::JoinHandle;
19use tokio::time::timeout;
20use yaque::{Receiver, Sender};
21
22/// A tiered sink with automatic disk spillover.
23///
24/// Wraps any `Sink` implementation and automatically spills messages to disk
25/// when the primary sink is unavailable or backpressuring. A background task
26/// drains spooled messages back to the primary when it recovers.
27pub struct TieredSink<S: Sink> {
28    sink: Arc<S>,
29    spool_sender: Arc<Mutex<Sender>>,
30    /// Receiver is owned by TieredSink but accessed via Arc clone by drainer task
31    #[allow(dead_code)]
32    spool_receiver: Arc<Mutex<Receiver>>,
33    spool_count: Arc<AtomicU64>,
34    spool_bytes: Arc<AtomicU64>,
35    circuit: Arc<CircuitBreaker>,
36    codec: CompressionCodec,
37    config: TieredSinkConfig,
38    shutdown: Arc<Notify>,
39    drain_handle: Option<JoinHandle<()>>,
40    disk_available: Arc<std::sync::atomic::AtomicBool>,
41    #[allow(dead_code)]
42    disk_poller_handle: Option<JoinHandle<()>>,
43
44    /// Serialises send + drain in `StrictFifo`. `None` otherwise.
45    fifo_gate: Option<Arc<Mutex<()>>>,
46
47    // Metrics
48    hot_path_count: AtomicU64,
49    cold_path_count: AtomicU64,
50}
51
52impl<S: Sink> TieredSink<S> {
53    /// Create a new TieredSink wrapping the given sink.
54    ///
55    /// # Errors
56    ///
57    /// Returns an error if the spool file cannot be opened.
58    pub async fn new(sink: S, config: TieredSinkConfig) -> Result<Self> {
59        let (sender, receiver) =
60            yaque::channel(&config.spool_path).map_err(|e| TieredSinkError::SpoolOpen {
61                path: config.spool_path.display().to_string(),
62                message: e.to_string(),
63            })?;
64
65        let sink = Arc::new(sink);
66        let spool_sender = Arc::new(Mutex::new(sender));
67        let spool_receiver = Arc::new(Mutex::new(receiver));
68
69        // Recover counters in spawn_blocking -- segment
70        // files can be GB-sized after a crash; walking them on the
71        // async runtime pins a tokio worker.
72        let spool_path_for_scan = config.spool_path.clone();
73        let (initial_count, initial_bytes) =
74            tokio::task::spawn_blocking(move || spool_item_count_and_bytes(&spool_path_for_scan))
75                .await
76                .unwrap_or((0, 0));
77        let spool_count = Arc::new(AtomicU64::new(initial_count));
78        let spool_bytes = Arc::new(AtomicU64::new(initial_bytes));
79
80        let circuit = Arc::new(CircuitBreaker::new(
81            config.circuit_failure_threshold,
82            config.circuit_reset_timeout(),
83        ));
84        let shutdown = Arc::new(Notify::new());
85        let codec = config.compression;
86        let disk_available = Arc::new(std::sync::atomic::AtomicBool::new(true));
87
88        // Shared by senders + drainer in StrictFifo to enforce
89        // total ordering at the sink.
90        let fifo_gate =
91            matches!(config.ordering, OrderingMode::StrictFifo).then(|| Arc::new(Mutex::new(())));
92
93        // Start disk-aware capacity poller if configured
94        let disk_poller_handle = config.disk_aware.as_ref().map(|disk_cfg| {
95            let spool_path = config.spool_path.clone();
96            let disk_flag = Arc::clone(&disk_available);
97            let shutdown_clone = Arc::clone(&shutdown);
98            let poll_interval = std::time::Duration::from_secs(disk_cfg.poll_interval_secs);
99            let max_usage = disk_cfg.max_usage_percent;
100
101            tokio::spawn(disk_capacity_poller(
102                spool_path,
103                disk_flag,
104                max_usage,
105                poll_interval,
106                shutdown_clone,
107            ))
108        });
109
110        // Start drain task
111        let drain_handle = tokio::spawn(drainer::drain_loop(
112            Arc::clone(&sink),
113            Arc::clone(&spool_receiver),
114            Arc::clone(&spool_count),
115            Arc::clone(&spool_bytes),
116            Arc::clone(&circuit),
117            codec,
118            config.drain_strategy,
119            config.drain_interval(),
120            Arc::clone(&shutdown),
121            fifo_gate.as_ref().map(Arc::clone),
122        ));
123
124        Ok(Self {
125            sink,
126            spool_sender,
127            spool_receiver,
128            spool_count,
129            spool_bytes,
130            circuit,
131            codec,
132            config,
133            shutdown,
134            drain_handle: Some(drain_handle),
135            disk_available,
136            disk_poller_handle,
137            fifo_gate,
138            hot_path_count: AtomicU64::new(0),
139            cold_path_count: AtomicU64::new(0),
140        })
141    }
142
143    /// Send a message through the tiered sink.
144    ///
145    /// The message goes through the hot path (direct to sink) if:
146    /// - Circuit is closed AND ordering mode is Interleaved
147    /// - OR circuit is closed AND ordering is StrictFifo AND spool is empty
148    ///
149    /// Otherwise, the message is spooled to disk.
150    ///
151    /// # Errors
152    ///
153    /// Returns an error if:
154    /// - The sink returns a fatal error
155    /// - The spool is full
156    /// - Compression fails
157    pub async fn send(&self, data: &[u8]) -> Result<()> {
158        // StrictFifo: serialise decision + send + enqueue against
159        // other senders and the drainer. Interleaved: no gate.
160        let _gate = match &self.fifo_gate {
161            Some(gate) => Some(gate.lock().await),
162            None => None,
163        };
164
165        let use_hot_path = self.should_use_hot_path().await;
166
167        if use_hot_path {
168            match self.try_hot_path(data).await {
169                Ok(()) => {
170                    self.hot_path_count.fetch_add(1, AtomicOrdering::Relaxed);
171                    #[cfg(feature = "metrics")]
172                    ::metrics::counter!("dfe_spool_hot_path_total").increment(1);
173                    return Ok(());
174                }
175                Err(TieredSinkError::Sink(_)) => {
176                    // Fatal error, don't spool
177                    return Err(TieredSinkError::Sink("fatal sink error".into()));
178                }
179                Err(_) => {
180                    // Retryable error, fall through to spool
181                }
182            }
183        }
184
185        // Cold path: spool to disk
186        self.spool_message(data).await?;
187        self.cold_path_count.fetch_add(1, AtomicOrdering::Relaxed);
188        #[cfg(feature = "metrics")]
189        ::metrics::counter!("dfe_spool_cold_path_total").increment(1);
190        Ok(())
191    }
192
193    /// Determine if we should attempt the hot path.
194    async fn should_use_hot_path(&self) -> bool {
195        let circuit_state = self.circuit.state().await;
196
197        #[cfg(feature = "metrics")]
198        ::metrics::gauge!("dfe_spool_circuit_state").set(match circuit_state {
199            CircuitState::Closed => 0.0,
200            CircuitState::HalfOpen => 1.0,
201            CircuitState::Open => 2.0,
202        });
203
204        match circuit_state {
205            CircuitState::Open => false,
206            CircuitState::Closed | CircuitState::HalfOpen => match self.config.ordering {
207                OrderingMode::Interleaved => true,
208                OrderingMode::StrictFifo => {
209                    // Only use hot path if spool is empty
210                    self.spool_count.load(AtomicOrdering::Relaxed) == 0
211                }
212            },
213        }
214    }
215
216    /// Try to send via hot path (direct to sink).
217    async fn try_hot_path(&self, data: &[u8]) -> Result<()> {
218        let send_timeout = self.config.send_timeout_duration();
219
220        match timeout(send_timeout, self.sink.try_send(data)).await {
221            Ok(Ok(())) => {
222                self.circuit.record_success().await;
223                Ok(())
224            }
225            Ok(Err(SinkError::Full)) => {
226                // Backpressure, don't count as failure for circuit
227                Err(TieredSinkError::Spool("sink full".into()))
228            }
229            Ok(Err(SinkError::Unavailable)) => {
230                self.circuit.record_failure().await;
231                #[cfg(feature = "metrics")]
232                ::metrics::counter!("dfe_spool_circuit_trips_total").increment(1);
233                Err(TieredSinkError::Spool("sink unavailable".into()))
234            }
235            Ok(Err(SinkError::Fatal(e))) => {
236                // Fatal error - propagate, don't spool
237                Err(TieredSinkError::Sink(e.to_string()))
238            }
239            Err(_timeout) => {
240                self.circuit.record_failure().await;
241                #[cfg(feature = "metrics")]
242                ::metrics::counter!("dfe_spool_circuit_trips_total").increment(1);
243                Err(TieredSinkError::Spool("send timeout".into()))
244            }
245        }
246    }
247
248    /// Reserve capacity (`fetch_update`) before enqueue; roll back
249    /// on failure. Atomic reservation prevents two concurrent
250    /// callers from both passing the cap check and overshooting.
251    async fn spool_message(&self, data: &[u8]) -> Result<()> {
252        // Check disk availability first
253        if !self.disk_available.load(AtomicOrdering::Relaxed) {
254            return Err(TieredSinkError::DiskUnavailable);
255        }
256
257        let compressed = self.codec.compress(data)?;
258        let compressed_len = compressed.len() as u64;
259
260        // Reserve item slot.
261        if let Some(max_items) = self.config.max_spool_items {
262            let max_items_u64 = max_items as u64;
263            self.spool_count
264                .fetch_update(AtomicOrdering::AcqRel, AtomicOrdering::Acquire, |cur| {
265                    if cur < max_items_u64 {
266                        Some(cur + 1)
267                    } else {
268                        None
269                    }
270                })
271                .map_err(|_| {
272                    TieredSinkError::SpoolFull(format!("max items {max_items} reached"))
273                })?;
274        } else {
275            self.spool_count.fetch_add(1, AtomicOrdering::AcqRel);
276        }
277
278        // Reserve byte budget; roll back item slot on failure.
279        if let Some(max_bytes) = self.config.max_spool_bytes {
280            if let Err(current_bytes) = self.spool_bytes.fetch_update(
281                AtomicOrdering::AcqRel,
282                AtomicOrdering::Acquire,
283                |cur| {
284                    cur.checked_add(compressed_len)
285                        .filter(|new| *new <= max_bytes)
286                },
287            ) {
288                self.spool_count.fetch_sub(1, AtomicOrdering::AcqRel);
289                return Err(TieredSinkError::SpoolFull(format!(
290                    "max spool bytes {max_bytes} reached (current: {current_bytes}, \
291                     new message: {compressed_len})"
292                )));
293            }
294        } else {
295            self.spool_bytes
296                .fetch_add(compressed_len, AtomicOrdering::AcqRel);
297        }
298
299        // Enqueue; roll back both reservations on failure.
300        let mut sender = self.spool_sender.lock().await;
301        if let Err(e) = sender.send(compressed).await {
302            drop(sender);
303            self.spool_count.fetch_sub(1, AtomicOrdering::AcqRel);
304            self.spool_bytes
305                .fetch_sub(compressed_len, AtomicOrdering::AcqRel);
306            return Err(TieredSinkError::Spool(e.to_string()));
307        }
308        drop(sender);
309
310        #[cfg(feature = "metrics")]
311        {
312            ::metrics::gauge!("dfe_spool_messages")
313                .set(self.spool_count.load(AtomicOrdering::Relaxed) as f64);
314            ::metrics::gauge!("dfe_spool_bytes")
315                .set(self.spool_bytes.load(AtomicOrdering::Relaxed) as f64);
316            // New default (metrics audit): spill RATE. drain < enqueue => backlog.
317            ::metrics::counter!("dfe_spool_enqueue_total").increment(1);
318        }
319
320        #[cfg(feature = "tracing")]
321        tracing::debug!(
322            spool_items = self.spool_count.load(AtomicOrdering::Relaxed),
323            spool_bytes = self.spool_bytes.load(AtomicOrdering::Relaxed),
324            "Message spooled to disk"
325        );
326
327        Ok(())
328    }
329
330    /// Get the number of messages currently in the spool.
331    #[allow(clippy::cast_possible_truncation)]
332    pub async fn spool_len(&self) -> usize {
333        self.spool_count.load(AtomicOrdering::Relaxed) as usize
334    }
335
336    /// Check if the spool is empty.
337    pub async fn spool_is_empty(&self) -> bool {
338        self.spool_count.load(AtomicOrdering::Relaxed) == 0
339    }
340
341    /// Get the approximate number of bytes currently in the spool.
342    #[must_use]
343    pub fn spool_bytes(&self) -> u64 {
344        self.spool_bytes.load(AtomicOrdering::Relaxed)
345    }
346
347    /// Check if disk is available for spooling.
348    #[must_use]
349    pub fn is_disk_available(&self) -> bool {
350        self.disk_available.load(AtomicOrdering::Relaxed)
351    }
352
353    /// Get the current circuit breaker state.
354    pub async fn circuit_state(&self) -> CircuitState {
355        self.circuit.state().await
356    }
357
358    /// Get hot path message count.
359    #[must_use]
360    pub fn hot_path_count(&self) -> u64 {
361        self.hot_path_count.load(AtomicOrdering::Relaxed)
362    }
363
364    /// Get cold path (spooled) message count.
365    #[must_use]
366    pub fn cold_path_count(&self) -> u64 {
367        self.cold_path_count.load(AtomicOrdering::Relaxed)
368    }
369
370    /// Get a reference to the underlying sink.
371    pub fn inner(&self) -> &S {
372        &self.sink
373    }
374
375    /// Manually reset the circuit breaker.
376    pub async fn reset_circuit(&self) {
377        self.circuit.reset().await;
378    }
379
380    /// Shutdown the drain task gracefully.
381    pub async fn shutdown(mut self) {
382        self.shutdown.notify_one();
383        if let Some(handle) = self.drain_handle.take() {
384            let _ = handle.await;
385        }
386    }
387}
388
389/// Count existing items and sum payload bytes in a yaque queue directory.
390///
391/// yaque stores messages as `[4-byte Hamming header][payload]` in segment files
392/// named `<n>.q`. The receiver position is persisted in `recv-metadata`.
393///
394/// Returns `(item_count, payload_bytes)`.
395fn spool_item_count_and_bytes(path: &std::path::Path) -> (u64, u64) {
396    if !path.is_dir() {
397        return (0, 0);
398    }
399
400    // Read receiver state from recv-metadata (two big-endian u64: segment, position)
401    let recv_metadata_path = path.join("recv-metadata");
402    let (recv_segment, recv_position) = if recv_metadata_path.exists() {
403        std::fs::read(&recv_metadata_path)
404            .ok()
405            .and_then(|data| {
406                if data.len() >= 16 {
407                    let segment = u64::from_be_bytes(data[0..8].try_into().ok()?);
408                    let position = u64::from_be_bytes(data[8..16].try_into().ok()?);
409                    Some((segment, position))
410                } else {
411                    None
412                }
413            })
414            .unwrap_or((0, 0))
415    } else {
416        (0, 0)
417    };
418
419    // Collect segment files at or after the receiver position
420    let mut segments: Vec<u64> = Vec::new();
421    if let Ok(entries) = std::fs::read_dir(path) {
422        for entry in entries.flatten() {
423            let file_path = entry.path();
424            if file_path.extension().and_then(|e| e.to_str()) == Some("q")
425                && let Some(seg_num) = file_path
426                    .file_stem()
427                    .and_then(|s| s.to_str())
428                    .and_then(|s| s.parse::<u64>().ok())
429                && seg_num >= recv_segment
430            {
431                segments.push(seg_num);
432            }
433        }
434    }
435    segments.sort_unstable();
436
437    let header_eof: [u8; 4] = [255, 255, 255, 255];
438    let mut count = 0u64;
439    let mut bytes = 0u64;
440
441    for &seg_num in &segments {
442        let seg_path = path.join(format!("{seg_num}.q"));
443        let Ok(file_data) = std::fs::read(&seg_path) else {
444            continue;
445        };
446
447        #[allow(clippy::cast_possible_truncation)]
448        let start = if seg_num == recv_segment {
449            recv_position as usize
450        } else {
451            0
452        };
453
454        let mut pos = start;
455        while pos + 4 <= file_data.len() {
456            let header_bytes: [u8; 4] = file_data[pos..pos + 4].try_into().unwrap_or([0; 4]);
457            if header_bytes == header_eof {
458                break;
459            }
460            let encoded = u32::from_be_bytes(header_bytes);
461            let payload_len = (encoded & 0x03_FF_FF_FF) as usize;
462            pos += 4 + payload_len;
463            if pos <= file_data.len() {
464                count += 1;
465                bytes += payload_len as u64;
466            }
467        }
468    }
469
470    (count, bytes)
471}
472
473/// Check available disk space using `statvfs`.
474///
475/// Returns `(total_bytes, available_bytes)` for the filesystem containing `path`.
476///
477/// # Safety
478///
479/// Calls `libc::statvfs` which is unsafe but well-defined when given a valid path.
480#[allow(unsafe_code)]
481fn check_disk_space(path: &std::path::Path) -> Option<(u64, u64)> {
482    use std::ffi::CString;
483    let c_path = CString::new(path.to_string_lossy().as_bytes()).ok()?;
484
485    // SAFETY: zeroed statvfs is a valid initialisation for the struct
486    // before passing to libc::statvfs which fills all fields.
487    // c_path is a valid null-terminated C string pointing to an existing
488    // filesystem path, and stat is a properly-sized statvfs struct.
489    #[allow(unsafe_code)]
490    let stat = unsafe {
491        let mut stat: libc::statvfs = std::mem::zeroed();
492        let result = libc::statvfs(c_path.as_ptr(), &raw mut stat);
493        if result != 0 {
494            return None;
495        }
496        stat
497    };
498
499    // Portability: libc::statvfs field widths differ across platforms.
500    // Linux: f_blocks/f_bavail/f_frsize are all u64. macOS (aarch64): f_frsize
501    // is c_ulong (u64) but f_blocks/f_bavail are fsblkcnt_t (u32), so only those
502    // two need the widening cast here; f_frsize is already u64. Fixes #39.
503    #[cfg(target_os = "macos")]
504    {
505        let block_size: u64 = stat.f_frsize;
506        let total: u64 = u64::from(stat.f_blocks) * block_size;
507        let available: u64 = u64::from(stat.f_bavail) * block_size;
508        Some((total, available))
509    }
510    #[cfg(not(target_os = "macos"))]
511    {
512        let block_size = stat.f_frsize;
513        let total = stat.f_blocks * block_size;
514        let available = stat.f_bavail * block_size;
515        Some((total, available))
516    }
517}
518
519/// Background poller that checks disk usage and sets a flag.
520async fn disk_capacity_poller(
521    spool_path: std::path::PathBuf,
522    disk_available: Arc<std::sync::atomic::AtomicBool>,
523    max_usage_percent: f64,
524    poll_interval: std::time::Duration,
525    shutdown: Arc<Notify>,
526) {
527    loop {
528        tokio::select! {
529            () = shutdown.notified() => {
530                #[cfg(feature = "tracing")]
531                tracing::debug!("Disk capacity poller shutting down");
532                return;
533            }
534            () = tokio::time::sleep(poll_interval) => {}
535        }
536
537        let disk_space = check_disk_space(&spool_path);
538
539        #[cfg(feature = "metrics")]
540        if let Some((total, avail)) = disk_space {
541            ::metrics::gauge!("dfe_spool_disk_available_bytes").set(avail as f64);
542            ::metrics::gauge!("dfe_spool_disk_total_bytes").set(total as f64);
543            // New default (metrics audit): disk fill fraction (0.0-1.0). Disk
544            // total is known here, so emit the ratio directly.
545            if total > 0 {
546                ::metrics::gauge!("dfe_spool_disk_usage_ratio")
547                    .set(1.0 - (avail as f64 / total as f64));
548            }
549        }
550
551        let available = disk_space.is_none_or(|(total, avail)| {
552            if total == 0 {
553                return true;
554            }
555            let used_ratio = 1.0 - (avail as f64 / total as f64);
556            let ok = used_ratio < max_usage_percent;
557            #[cfg(feature = "tracing")]
558            if !ok {
559                tracing::warn!(
560                    used_percent = format!("{:.1}%", used_ratio * 100.0),
561                    threshold = format!("{:.1}%", max_usage_percent * 100.0),
562                    "Disk usage exceeds threshold, pausing spool writes"
563                );
564            }
565            ok
566        });
567
568        disk_available.store(available, std::sync::atomic::Ordering::Relaxed);
569    }
570}
571
572impl<S: Sink> Drop for TieredSink<S> {
573    fn drop(&mut self) {
574        // Durability requires explicit `shutdown().await`.
575        // Drop can only notify the background drainer -- it can't
576        // await it. If anything's still spooled at drop time, the
577        // caller has skipped the explicit shutdown and risks losing
578        // that data. Warn + count so this shows up in dashboards.
579        let pending = self.spool_count.load(AtomicOrdering::Relaxed);
580        if pending > 0 {
581            #[cfg(feature = "metrics")]
582            ::metrics::counter!("dfe_spool_dropped_without_shutdown_total").increment(1);
583            #[cfg(feature = "tracing")]
584            tracing::warn!(
585                pending,
586                "TieredSink dropped with spooled work pending -- call shutdown().await for durability"
587            );
588        }
589        self.shutdown.notify_one();
590    }
591}
592
593#[cfg(test)]
594mod tests {
595    use super::*;
596    use std::sync::atomic::AtomicBool;
597    use tempfile::tempdir;
598
599    #[derive(Debug)]
600    struct TestError(String);
601
602    impl std::fmt::Display for TestError {
603        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
604            write!(f, "{}", self.0)
605        }
606    }
607
608    impl std::error::Error for TestError {}
609
610    struct TestSink {
611        available: AtomicBool,
612        received: Mutex<Vec<Vec<u8>>>,
613    }
614
615    impl TestSink {
616        fn new() -> Self {
617            Self {
618                available: AtomicBool::new(true),
619                received: Mutex::new(Vec::new()),
620            }
621        }
622
623        fn set_available(&self, available: bool) {
624            self.available.store(available, AtomicOrdering::SeqCst);
625        }
626
627        async fn received_count(&self) -> usize {
628            self.received.lock().await.len()
629        }
630    }
631
632    impl Sink for TestSink {
633        type Error = TestError;
634
635        async fn try_send(&self, data: &[u8]) -> std::result::Result<(), SinkError<Self::Error>> {
636            if self.available.load(AtomicOrdering::SeqCst) {
637                self.received.lock().await.push(data.to_vec());
638                Ok(())
639            } else {
640                Err(SinkError::Unavailable)
641            }
642        }
643    }
644
645    #[tokio::test]
646    async fn test_hot_path_when_available() {
647        let dir = tempdir().unwrap();
648        let spool_path = dir.path().join("test-queue");
649
650        let sink = TestSink::new();
651        let config = TieredSinkConfig::new(&spool_path);
652
653        let tiered = TieredSink::new(sink, config).await.unwrap();
654
655        tiered.send(b"hello").await.unwrap();
656
657        assert_eq!(tiered.hot_path_count(), 1);
658        assert_eq!(tiered.cold_path_count(), 0);
659        assert!(tiered.spool_is_empty().await);
660        assert_eq!(tiered.inner().received_count().await, 1);
661
662        tiered.shutdown().await;
663    }
664
665    #[tokio::test]
666    async fn test_cold_path_when_unavailable() {
667        let dir = tempdir().unwrap();
668        let spool_path = dir.path().join("test-queue");
669
670        let sink = TestSink::new();
671        sink.set_available(false);
672
673        let mut config = TieredSinkConfig::new(&spool_path);
674        config.circuit_failure_threshold = 1; // Open circuit quickly
675
676        let tiered = TieredSink::new(sink, config).await.unwrap();
677
678        // First message triggers circuit open
679        tiered.send(b"hello").await.unwrap();
680
681        // Should have spooled
682        assert_eq!(tiered.cold_path_count(), 1);
683        assert!(!tiered.spool_is_empty().await);
684        assert_eq!(tiered.inner().received_count().await, 0);
685
686        tiered.shutdown().await;
687    }
688
689    #[tokio::test]
690    async fn test_circuit_opens_after_failures() {
691        let dir = tempdir().unwrap();
692        let spool_path = dir.path().join("test-queue");
693
694        let sink = TestSink::new();
695        sink.set_available(false);
696
697        let mut config = TieredSinkConfig::new(&spool_path);
698        config.circuit_failure_threshold = 3;
699
700        let tiered = TieredSink::new(sink, config).await.unwrap();
701
702        // First two fail but circuit stays closed
703        tiered.send(b"1").await.unwrap();
704        tiered.send(b"2").await.unwrap();
705        assert_eq!(tiered.circuit_state().await, CircuitState::Closed);
706
707        // Third failure opens circuit
708        tiered.send(b"3").await.unwrap();
709        assert_eq!(tiered.circuit_state().await, CircuitState::Open);
710
711        tiered.shutdown().await;
712    }
713
714    #[tokio::test]
715    async fn test_drain_recovers_messages() {
716        let dir = tempdir().unwrap();
717        let spool_path = dir.path().join("test-queue");
718
719        let sink = TestSink::new();
720        sink.set_available(false);
721
722        let mut config = TieredSinkConfig::new(&spool_path);
723        config.circuit_failure_threshold = 1;
724        config.circuit_reset_timeout_ms = 50;
725        config.drain_interval_ms = 10;
726
727        let tiered = TieredSink::new(sink, config).await.unwrap();
728
729        // Spool a message
730        tiered.send(b"recover me").await.unwrap();
731        assert_eq!(tiered.spool_len().await, 1);
732
733        // Make sink available and wait for drain
734        tiered.inner().set_available(true);
735        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
736
737        // Should have drained
738        assert!(tiered.spool_is_empty().await);
739        assert_eq!(tiered.inner().received_count().await, 1);
740
741        tiered.shutdown().await;
742    }
743
744    #[tokio::test]
745    async fn test_strict_fifo_waits_for_drain() {
746        let dir = tempdir().unwrap();
747        let spool_path = dir.path().join("test-queue");
748
749        let sink = TestSink::new();
750        sink.set_available(false);
751
752        let mut config = TieredSinkConfig::new(&spool_path);
753        config.ordering = OrderingMode::StrictFifo;
754        config.circuit_failure_threshold = 1;
755
756        let tiered = TieredSink::new(sink, config).await.unwrap();
757
758        // First message gets spooled due to unavailable sink
759        tiered.send(b"first message").await.unwrap();
760        assert_eq!(tiered.spool_len().await, 1);
761
762        // Make sink available again
763        tiered.inner().set_available(true);
764        tiered.reset_circuit().await;
765
766        // In StrictFifo mode, new messages should spool while spool is non-empty
767        tiered.send(b"new message").await.unwrap();
768
769        // Should still be 2 messages in spool (strict FIFO queues new messages behind old)
770        assert_eq!(tiered.spool_len().await, 2);
771        assert_eq!(tiered.cold_path_count(), 2);
772
773        tiered.shutdown().await;
774    }
775
776    #[tokio::test]
777    async fn test_max_spool_bytes_enforced() {
778        let dir = tempdir().unwrap();
779        let spool_path = dir.path().join("test-bytes-limit");
780
781        let sink = TestSink::new();
782        sink.set_available(false);
783
784        let mut config = TieredSinkConfig::new(&spool_path);
785        config.circuit_failure_threshold = 1;
786        // Set a very small byte limit
787        config.max_spool_bytes = Some(50);
788
789        let tiered = TieredSink::new(sink, config).await.unwrap();
790
791        // First message should spool (compressed size fits)
792        tiered.send(b"small").await.unwrap();
793        assert_eq!(tiered.cold_path_count(), 1);
794        assert!(tiered.spool_bytes() > 0);
795
796        // Keep sending until we hit the limit
797        let mut hit_limit = false;
798        for _ in 0..100 {
799            match tiered.send(b"more data here").await {
800                Ok(()) => {}
801                Err(TieredSinkError::SpoolFull(msg)) => {
802                    assert!(msg.contains("max spool bytes"));
803                    hit_limit = true;
804                    break;
805                }
806                Err(e) => panic!("unexpected error: {e}"),
807            }
808        }
809        assert!(hit_limit, "should have hit spool byte limit");
810
811        tiered.shutdown().await;
812    }
813
814    #[tokio::test]
815    async fn test_spool_bytes_decremented_on_drain() {
816        let dir = tempdir().unwrap();
817        let spool_path = dir.path().join("test-bytes-drain");
818
819        let sink = TestSink::new();
820        sink.set_available(false);
821
822        let mut config = TieredSinkConfig::new(&spool_path);
823        config.circuit_failure_threshold = 1;
824        config.circuit_reset_timeout_ms = 50;
825        config.drain_interval_ms = 10;
826
827        let tiered = TieredSink::new(sink, config).await.unwrap();
828
829        // Spool some messages
830        tiered.send(b"drain me").await.unwrap();
831        tiered.send(b"drain me too").await.unwrap();
832        let bytes_after_spool = tiered.spool_bytes();
833        assert!(bytes_after_spool > 0);
834
835        // Make sink available and wait for drain
836        tiered.inner().set_available(true);
837        tokio::time::sleep(std::time::Duration::from_millis(300)).await;
838
839        // Bytes should be decremented
840        assert_eq!(tiered.spool_bytes(), 0);
841        assert!(tiered.spool_is_empty().await);
842
843        tiered.shutdown().await;
844    }
845
846    #[tokio::test]
847    async fn test_spool_count_initialised_from_existing_queue() {
848        let dir = tempdir().unwrap();
849        let spool_path = dir.path().join("test-init-count");
850
851        // Phase 1: Create a TieredSink, spool messages, then drop
852        {
853            let sink = TestSink::new();
854            sink.set_available(false);
855
856            let mut config = TieredSinkConfig::new(&spool_path);
857            config.circuit_failure_threshold = 1;
858
859            let tiered = TieredSink::new(sink, config).await.unwrap();
860
861            tiered.send(b"message 1").await.unwrap();
862            tiered.send(b"message 2").await.unwrap();
863            tiered.send(b"message 3").await.unwrap();
864            assert_eq!(tiered.spool_len().await, 3);
865
866            tiered.shutdown().await;
867        }
868
869        // Phase 2: Re-open -- spool_count should reflect existing items
870        {
871            let sink = TestSink::new();
872            let config = TieredSinkConfig::new(&spool_path);
873            let tiered = TieredSink::new(sink, config).await.unwrap();
874
875            assert_eq!(tiered.spool_len().await, 3);
876            assert!(tiered.spool_bytes() > 0);
877
878            tiered.shutdown().await;
879        }
880    }
881
882    #[tokio::test]
883    async fn test_disk_available_flag() {
884        let dir = tempdir().unwrap();
885        let spool_path = dir.path().join("test-disk-flag");
886
887        let sink = TestSink::new();
888        sink.set_available(false);
889
890        let mut config = TieredSinkConfig::new(&spool_path);
891        config.circuit_failure_threshold = 1;
892
893        let tiered = TieredSink::new(sink, config).await.unwrap();
894
895        // By default, disk should be available
896        assert!(tiered.is_disk_available());
897
898        // Manually set flag to false to simulate full disk
899        tiered.disk_available.store(false, AtomicOrdering::Relaxed);
900
901        let result = tiered.send(b"should fail").await;
902        assert!(matches!(result, Err(TieredSinkError::DiskUnavailable)));
903
904        tiered.shutdown().await;
905    }
906
907    /// C15 regression: N concurrent senders against a small cap.
908    /// Pre-fix overshot; atomic reservation keeps total exact.
909    #[tokio::test]
910    async fn test_max_spool_items_not_overshooting_under_concurrency() {
911        let dir = tempdir().unwrap();
912        let spool_path = dir.path().join("test-toctou-items");
913
914        let sink = TestSink::new();
915        sink.set_available(false);
916
917        let mut config = TieredSinkConfig::new(&spool_path);
918        config.circuit_failure_threshold = 1;
919        config.max_spool_items = Some(10);
920
921        let tiered = Arc::new(TieredSink::new(sink, config).await.unwrap());
922
923        // Fan out 100 concurrent senders against a cap of 10.
924        let mut joins = Vec::new();
925        for _ in 0..100 {
926            let t = Arc::clone(&tiered);
927            joins.push(tokio::spawn(async move { t.send(b"contention").await }));
928        }
929
930        let mut accepted = 0usize;
931        let mut rejected = 0usize;
932        for j in joins {
933            match j.await.unwrap() {
934                Ok(()) => accepted += 1,
935                Err(TieredSinkError::SpoolFull(_)) => rejected += 1,
936                Err(e) => panic!("unexpected error: {e}"),
937            }
938        }
939
940        assert_eq!(accepted, 10, "cap must not overshoot (got {accepted})");
941        assert_eq!(rejected, 90);
942        assert_eq!(tiered.spool_len().await, 10);
943
944        let tiered = Arc::try_unwrap(tiered)
945            .map_err(|_| "outstanding Arc refs")
946            .unwrap();
947        tiered.shutdown().await;
948    }
949}