Skip to main content

hyperi_rustlib/tiered_sink/
tiered.rs

1// Project:   hyperi-rustlib
2// File:      src/tiered_sink/tiered.rs
3// Purpose:   TieredSink implementation
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! TieredSink implementation.
10
11use crate::tiered_sink::{
12    CircuitBreaker, CircuitState, CompressionCodec, OrderingMode, Result, Sink, SinkError,
13    TieredSinkConfig, TieredSinkError, drainer,
14};
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
17use tokio::sync::{Mutex, Notify};
18use tokio::task::JoinHandle;
19use tokio::time::timeout;
20use yaque::{Receiver, Sender};
21
22/// A tiered sink with automatic disk spillover.
23///
24/// Wraps any `Sink` implementation and automatically spills messages to disk
25/// when the primary sink is unavailable or backpressuring. A background task
26/// drains spooled messages back to the primary when it recovers.
27pub struct TieredSink<S: Sink> {
28    sink: Arc<S>,
29    spool_sender: Arc<Mutex<Sender>>,
30    /// Receiver is owned by TieredSink but accessed via Arc clone by drainer task
31    #[allow(dead_code)]
32    spool_receiver: Arc<Mutex<Receiver>>,
33    spool_count: Arc<AtomicU64>,
34    spool_bytes: Arc<AtomicU64>,
35    circuit: Arc<CircuitBreaker>,
36    codec: CompressionCodec,
37    config: TieredSinkConfig,
38    shutdown: Arc<Notify>,
39    drain_handle: Option<JoinHandle<()>>,
40    disk_available: Arc<std::sync::atomic::AtomicBool>,
41    #[allow(dead_code)]
42    disk_poller_handle: Option<JoinHandle<()>>,
43
44    /// Serialises send + drain in `StrictFifo`. `None` otherwise.
45    fifo_gate: Option<Arc<Mutex<()>>>,
46
47    // Metrics
48    hot_path_count: AtomicU64,
49    cold_path_count: AtomicU64,
50}
51
52impl<S: Sink> TieredSink<S> {
53    /// Create a new TieredSink wrapping the given sink.
54    ///
55    /// # Errors
56    ///
57    /// Returns an error if the spool file cannot be opened.
58    pub async fn new(sink: S, config: TieredSinkConfig) -> Result<Self> {
59        let (sender, receiver) =
60            yaque::channel(&config.spool_path).map_err(|e| TieredSinkError::SpoolOpen {
61                path: config.spool_path.display().to_string(),
62                message: e.to_string(),
63            })?;
64
65        let sink = Arc::new(sink);
66        let spool_sender = Arc::new(Mutex::new(sender));
67        let spool_receiver = Arc::new(Mutex::new(receiver));
68
69        // Recover counters in spawn_blocking -- segment
70        // files can be GB-sized after a crash; walking them on the
71        // async runtime pins a tokio worker.
72        let spool_path_for_scan = config.spool_path.clone();
73        let (initial_count, initial_bytes) =
74            tokio::task::spawn_blocking(move || spool_item_count_and_bytes(&spool_path_for_scan))
75                .await
76                .unwrap_or((0, 0));
77        let spool_count = Arc::new(AtomicU64::new(initial_count));
78        let spool_bytes = Arc::new(AtomicU64::new(initial_bytes));
79
80        let circuit = Arc::new(CircuitBreaker::new(
81            config.circuit_failure_threshold,
82            config.circuit_reset_timeout(),
83        ));
84        let shutdown = Arc::new(Notify::new());
85        let codec = config.compression;
86        let disk_available = Arc::new(std::sync::atomic::AtomicBool::new(true));
87
88        // Shared by senders + drainer in StrictFifo to enforce
89        // total ordering at the sink.
90        let fifo_gate =
91            matches!(config.ordering, OrderingMode::StrictFifo).then(|| Arc::new(Mutex::new(())));
92
93        // Start disk-aware capacity poller if configured
94        let disk_poller_handle = config.disk_aware.as_ref().map(|disk_cfg| {
95            let spool_path = config.spool_path.clone();
96            let disk_flag = Arc::clone(&disk_available);
97            let shutdown_clone = Arc::clone(&shutdown);
98            let poll_interval = std::time::Duration::from_secs(disk_cfg.poll_interval_secs);
99            let max_usage = disk_cfg.max_usage_percent;
100
101            tokio::spawn(disk_capacity_poller(
102                spool_path,
103                disk_flag,
104                max_usage,
105                poll_interval,
106                shutdown_clone,
107            ))
108        });
109
110        // Start drain task
111        let drain_handle = tokio::spawn(drainer::drain_loop(
112            Arc::clone(&sink),
113            Arc::clone(&spool_receiver),
114            Arc::clone(&spool_count),
115            Arc::clone(&spool_bytes),
116            Arc::clone(&circuit),
117            codec,
118            config.drain_strategy,
119            config.drain_interval(),
120            Arc::clone(&shutdown),
121            fifo_gate.as_ref().map(Arc::clone),
122        ));
123
124        Ok(Self {
125            sink,
126            spool_sender,
127            spool_receiver,
128            spool_count,
129            spool_bytes,
130            circuit,
131            codec,
132            config,
133            shutdown,
134            drain_handle: Some(drain_handle),
135            disk_available,
136            disk_poller_handle,
137            fifo_gate,
138            hot_path_count: AtomicU64::new(0),
139            cold_path_count: AtomicU64::new(0),
140        })
141    }
142
143    /// Send a message through the tiered sink.
144    ///
145    /// The message goes through the hot path (direct to sink) if:
146    /// - Circuit is closed AND ordering mode is Interleaved
147    /// - OR circuit is closed AND ordering is StrictFifo AND spool is empty
148    ///
149    /// Otherwise, the message is spooled to disk.
150    ///
151    /// # Errors
152    ///
153    /// Returns an error if:
154    /// - The sink returns a fatal error
155    /// - The spool is full
156    /// - Compression fails
157    pub async fn send(&self, data: &[u8]) -> Result<()> {
158        // StrictFifo: serialise decision + send + enqueue against
159        // other senders and the drainer. Interleaved: no gate.
160        let _gate = match &self.fifo_gate {
161            Some(gate) => Some(gate.lock().await),
162            None => None,
163        };
164
165        let use_hot_path = self.should_use_hot_path().await;
166
167        if use_hot_path {
168            match self.try_hot_path(data).await {
169                Ok(()) => {
170                    self.hot_path_count.fetch_add(1, AtomicOrdering::Relaxed);
171                    #[cfg(feature = "metrics")]
172                    ::metrics::counter!("dfe_spool_hot_path_total").increment(1);
173                    return Ok(());
174                }
175                Err(TieredSinkError::Sink(_)) => {
176                    // Fatal error, don't spool
177                    return Err(TieredSinkError::Sink("fatal sink error".into()));
178                }
179                Err(_) => {
180                    // Retryable error, fall through to spool
181                }
182            }
183        }
184
185        // Cold path: spool to disk
186        self.spool_message(data).await?;
187        self.cold_path_count.fetch_add(1, AtomicOrdering::Relaxed);
188        #[cfg(feature = "metrics")]
189        ::metrics::counter!("dfe_spool_cold_path_total").increment(1);
190        Ok(())
191    }
192
193    /// Determine if we should attempt the hot path.
194    async fn should_use_hot_path(&self) -> bool {
195        let circuit_state = self.circuit.state().await;
196
197        #[cfg(feature = "metrics")]
198        ::metrics::gauge!("dfe_spool_circuit_state").set(match circuit_state {
199            CircuitState::Closed => 0.0,
200            CircuitState::HalfOpen => 1.0,
201            CircuitState::Open => 2.0,
202        });
203
204        match circuit_state {
205            CircuitState::Open => false,
206            CircuitState::Closed | CircuitState::HalfOpen => match self.config.ordering {
207                OrderingMode::Interleaved => true,
208                OrderingMode::StrictFifo => {
209                    // Only use hot path if spool is empty
210                    self.spool_count.load(AtomicOrdering::Relaxed) == 0
211                }
212            },
213        }
214    }
215
216    /// Try to send via hot path (direct to sink).
217    async fn try_hot_path(&self, data: &[u8]) -> Result<()> {
218        let send_timeout = self.config.send_timeout_duration();
219
220        match timeout(send_timeout, self.sink.try_send(data)).await {
221            Ok(Ok(())) => {
222                self.circuit.record_success().await;
223                Ok(())
224            }
225            Ok(Err(SinkError::Full)) => {
226                // Backpressure, don't count as failure for circuit
227                Err(TieredSinkError::Spool("sink full".into()))
228            }
229            Ok(Err(SinkError::Unavailable)) => {
230                self.circuit.record_failure().await;
231                #[cfg(feature = "metrics")]
232                ::metrics::counter!("dfe_spool_circuit_trips_total").increment(1);
233                Err(TieredSinkError::Spool("sink unavailable".into()))
234            }
235            Ok(Err(SinkError::Fatal(e))) => {
236                // Fatal error - propagate, don't spool
237                Err(TieredSinkError::Sink(e.to_string()))
238            }
239            Err(_timeout) => {
240                self.circuit.record_failure().await;
241                #[cfg(feature = "metrics")]
242                ::metrics::counter!("dfe_spool_circuit_trips_total").increment(1);
243                Err(TieredSinkError::Spool("send timeout".into()))
244            }
245        }
246    }
247
248    /// Reserve capacity (`fetch_update`) before enqueue; roll back
249    /// on failure. Atomic reservation prevents two concurrent
250    /// callers from both passing the cap check and overshooting.
251    async fn spool_message(&self, data: &[u8]) -> Result<()> {
252        // Check disk availability first
253        if !self.disk_available.load(AtomicOrdering::Relaxed) {
254            return Err(TieredSinkError::DiskUnavailable);
255        }
256
257        let compressed = self.codec.compress(data)?;
258        let compressed_len = compressed.len() as u64;
259
260        // Reserve item slot.
261        if let Some(max_items) = self.config.max_spool_items {
262            let max_items_u64 = max_items as u64;
263            self.spool_count
264                .fetch_update(AtomicOrdering::AcqRel, AtomicOrdering::Acquire, |cur| {
265                    if cur < max_items_u64 {
266                        Some(cur + 1)
267                    } else {
268                        None
269                    }
270                })
271                .map_err(|_| {
272                    TieredSinkError::SpoolFull(format!("max items {max_items} reached"))
273                })?;
274        } else {
275            self.spool_count.fetch_add(1, AtomicOrdering::AcqRel);
276        }
277
278        // Reserve byte budget; roll back item slot on failure.
279        if let Some(max_bytes) = self.config.max_spool_bytes {
280            if let Err(current_bytes) = self.spool_bytes.fetch_update(
281                AtomicOrdering::AcqRel,
282                AtomicOrdering::Acquire,
283                |cur| {
284                    cur.checked_add(compressed_len)
285                        .filter(|new| *new <= max_bytes)
286                },
287            ) {
288                self.spool_count.fetch_sub(1, AtomicOrdering::AcqRel);
289                return Err(TieredSinkError::SpoolFull(format!(
290                    "max spool bytes {max_bytes} reached (current: {current_bytes}, \
291                     new message: {compressed_len})"
292                )));
293            }
294        } else {
295            self.spool_bytes
296                .fetch_add(compressed_len, AtomicOrdering::AcqRel);
297        }
298
299        // Enqueue; roll back both reservations on failure.
300        let mut sender = self.spool_sender.lock().await;
301        if let Err(e) = sender.send(compressed).await {
302            drop(sender);
303            self.spool_count.fetch_sub(1, AtomicOrdering::AcqRel);
304            self.spool_bytes
305                .fetch_sub(compressed_len, AtomicOrdering::AcqRel);
306            return Err(TieredSinkError::Spool(e.to_string()));
307        }
308        drop(sender);
309
310        #[cfg(feature = "metrics")]
311        {
312            ::metrics::gauge!("dfe_spool_messages")
313                .set(self.spool_count.load(AtomicOrdering::Relaxed) as f64);
314            ::metrics::gauge!("dfe_spool_bytes")
315                .set(self.spool_bytes.load(AtomicOrdering::Relaxed) as f64);
316        }
317
318        #[cfg(feature = "tracing")]
319        tracing::debug!(
320            spool_items = self.spool_count.load(AtomicOrdering::Relaxed),
321            spool_bytes = self.spool_bytes.load(AtomicOrdering::Relaxed),
322            "Message spooled to disk"
323        );
324
325        Ok(())
326    }
327
328    /// Get the number of messages currently in the spool.
329    #[allow(clippy::cast_possible_truncation)]
330    pub async fn spool_len(&self) -> usize {
331        self.spool_count.load(AtomicOrdering::Relaxed) as usize
332    }
333
334    /// Check if the spool is empty.
335    pub async fn spool_is_empty(&self) -> bool {
336        self.spool_count.load(AtomicOrdering::Relaxed) == 0
337    }
338
339    /// Get the approximate number of bytes currently in the spool.
340    #[must_use]
341    pub fn spool_bytes(&self) -> u64 {
342        self.spool_bytes.load(AtomicOrdering::Relaxed)
343    }
344
345    /// Check if disk is available for spooling.
346    #[must_use]
347    pub fn is_disk_available(&self) -> bool {
348        self.disk_available.load(AtomicOrdering::Relaxed)
349    }
350
351    /// Get the current circuit breaker state.
352    pub async fn circuit_state(&self) -> CircuitState {
353        self.circuit.state().await
354    }
355
356    /// Get hot path message count.
357    #[must_use]
358    pub fn hot_path_count(&self) -> u64 {
359        self.hot_path_count.load(AtomicOrdering::Relaxed)
360    }
361
362    /// Get cold path (spooled) message count.
363    #[must_use]
364    pub fn cold_path_count(&self) -> u64 {
365        self.cold_path_count.load(AtomicOrdering::Relaxed)
366    }
367
368    /// Get a reference to the underlying sink.
369    pub fn inner(&self) -> &S {
370        &self.sink
371    }
372
373    /// Manually reset the circuit breaker.
374    pub async fn reset_circuit(&self) {
375        self.circuit.reset().await;
376    }
377
378    /// Shutdown the drain task gracefully.
379    pub async fn shutdown(mut self) {
380        self.shutdown.notify_one();
381        if let Some(handle) = self.drain_handle.take() {
382            let _ = handle.await;
383        }
384    }
385}
386
387/// Count existing items and sum payload bytes in a yaque queue directory.
388///
389/// yaque stores messages as `[4-byte Hamming header][payload]` in segment files
390/// named `<n>.q`. The receiver position is persisted in `recv-metadata`.
391///
392/// Returns `(item_count, payload_bytes)`.
393fn spool_item_count_and_bytes(path: &std::path::Path) -> (u64, u64) {
394    if !path.is_dir() {
395        return (0, 0);
396    }
397
398    // Read receiver state from recv-metadata (two big-endian u64: segment, position)
399    let recv_metadata_path = path.join("recv-metadata");
400    let (recv_segment, recv_position) = if recv_metadata_path.exists() {
401        std::fs::read(&recv_metadata_path)
402            .ok()
403            .and_then(|data| {
404                if data.len() >= 16 {
405                    let segment = u64::from_be_bytes(data[0..8].try_into().ok()?);
406                    let position = u64::from_be_bytes(data[8..16].try_into().ok()?);
407                    Some((segment, position))
408                } else {
409                    None
410                }
411            })
412            .unwrap_or((0, 0))
413    } else {
414        (0, 0)
415    };
416
417    // Collect segment files at or after the receiver position
418    let mut segments: Vec<u64> = Vec::new();
419    if let Ok(entries) = std::fs::read_dir(path) {
420        for entry in entries.flatten() {
421            let file_path = entry.path();
422            if file_path.extension().and_then(|e| e.to_str()) == Some("q")
423                && let Some(seg_num) = file_path
424                    .file_stem()
425                    .and_then(|s| s.to_str())
426                    .and_then(|s| s.parse::<u64>().ok())
427                && seg_num >= recv_segment
428            {
429                segments.push(seg_num);
430            }
431        }
432    }
433    segments.sort_unstable();
434
435    let header_eof: [u8; 4] = [255, 255, 255, 255];
436    let mut count = 0u64;
437    let mut bytes = 0u64;
438
439    for &seg_num in &segments {
440        let seg_path = path.join(format!("{seg_num}.q"));
441        let Ok(file_data) = std::fs::read(&seg_path) else {
442            continue;
443        };
444
445        #[allow(clippy::cast_possible_truncation)]
446        let start = if seg_num == recv_segment {
447            recv_position as usize
448        } else {
449            0
450        };
451
452        let mut pos = start;
453        while pos + 4 <= file_data.len() {
454            let header_bytes: [u8; 4] = file_data[pos..pos + 4].try_into().unwrap_or([0; 4]);
455            if header_bytes == header_eof {
456                break;
457            }
458            let encoded = u32::from_be_bytes(header_bytes);
459            let payload_len = (encoded & 0x03_FF_FF_FF) as usize;
460            pos += 4 + payload_len;
461            if pos <= file_data.len() {
462                count += 1;
463                bytes += payload_len as u64;
464            }
465        }
466    }
467
468    (count, bytes)
469}
470
471/// Check available disk space using `statvfs`.
472///
473/// Returns `(total_bytes, available_bytes)` for the filesystem containing `path`.
474///
475/// # Safety
476///
477/// Calls `libc::statvfs` which is unsafe but well-defined when given a valid path.
478#[allow(unsafe_code)]
479fn check_disk_space(path: &std::path::Path) -> Option<(u64, u64)> {
480    use std::ffi::CString;
481    let c_path = CString::new(path.to_string_lossy().as_bytes()).ok()?;
482
483    // SAFETY: zeroed statvfs is a valid initialisation for the struct
484    // before passing to libc::statvfs which fills all fields.
485    // c_path is a valid null-terminated C string pointing to an existing
486    // filesystem path, and stat is a properly-sized statvfs struct.
487    #[allow(unsafe_code)]
488    let stat = unsafe {
489        let mut stat: libc::statvfs = std::mem::zeroed();
490        let result = libc::statvfs(c_path.as_ptr(), &raw mut stat);
491        if result != 0 {
492            return None;
493        }
494        stat
495    };
496
497    // Portability: libc::statvfs field widths differ across platforms.
498    // Linux: f_blocks/f_bavail/f_frsize are all u64. macOS (aarch64): f_frsize
499    // is c_ulong (u64) but f_blocks/f_bavail are fsblkcnt_t (u32), so only those
500    // two need the widening cast here; f_frsize is already u64. Fixes #39.
501    #[cfg(target_os = "macos")]
502    {
503        let block_size: u64 = stat.f_frsize;
504        let total: u64 = u64::from(stat.f_blocks) * block_size;
505        let available: u64 = u64::from(stat.f_bavail) * block_size;
506        Some((total, available))
507    }
508    #[cfg(not(target_os = "macos"))]
509    {
510        let block_size = stat.f_frsize;
511        let total = stat.f_blocks * block_size;
512        let available = stat.f_bavail * block_size;
513        Some((total, available))
514    }
515}
516
517/// Background poller that checks disk usage and sets a flag.
518async fn disk_capacity_poller(
519    spool_path: std::path::PathBuf,
520    disk_available: Arc<std::sync::atomic::AtomicBool>,
521    max_usage_percent: f64,
522    poll_interval: std::time::Duration,
523    shutdown: Arc<Notify>,
524) {
525    loop {
526        tokio::select! {
527            () = shutdown.notified() => {
528                #[cfg(feature = "tracing")]
529                tracing::debug!("Disk capacity poller shutting down");
530                return;
531            }
532            () = tokio::time::sleep(poll_interval) => {}
533        }
534
535        let disk_space = check_disk_space(&spool_path);
536
537        #[cfg(feature = "metrics")]
538        if let Some((total, avail)) = disk_space {
539            ::metrics::gauge!("dfe_spool_disk_available_bytes").set(avail as f64);
540            ::metrics::gauge!("dfe_spool_disk_total_bytes").set(total as f64);
541        }
542
543        let available = disk_space.is_none_or(|(total, avail)| {
544            if total == 0 {
545                return true;
546            }
547            let used_ratio = 1.0 - (avail as f64 / total as f64);
548            let ok = used_ratio < max_usage_percent;
549            #[cfg(feature = "tracing")]
550            if !ok {
551                tracing::warn!(
552                    used_percent = format!("{:.1}%", used_ratio * 100.0),
553                    threshold = format!("{:.1}%", max_usage_percent * 100.0),
554                    "Disk usage exceeds threshold, pausing spool writes"
555                );
556            }
557            ok
558        });
559
560        disk_available.store(available, std::sync::atomic::Ordering::Relaxed);
561    }
562}
563
564impl<S: Sink> Drop for TieredSink<S> {
565    fn drop(&mut self) {
566        // Durability requires explicit `shutdown().await`.
567        // Drop can only notify the background drainer -- it can't
568        // await it. If anything's still spooled at drop time, the
569        // caller has skipped the explicit shutdown and risks losing
570        // that data. Warn + count so this shows up in dashboards.
571        let pending = self.spool_count.load(AtomicOrdering::Relaxed);
572        if pending > 0 {
573            #[cfg(feature = "metrics")]
574            ::metrics::counter!("dfe_spool_dropped_without_shutdown_total").increment(1);
575            #[cfg(feature = "tracing")]
576            tracing::warn!(
577                pending,
578                "TieredSink dropped with spooled work pending -- call shutdown().await for durability"
579            );
580        }
581        self.shutdown.notify_one();
582    }
583}
584
585#[cfg(test)]
586mod tests {
587    use super::*;
588    use std::sync::atomic::AtomicBool;
589    use tempfile::tempdir;
590
591    #[derive(Debug)]
592    struct TestError(String);
593
594    impl std::fmt::Display for TestError {
595        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
596            write!(f, "{}", self.0)
597        }
598    }
599
600    impl std::error::Error for TestError {}
601
602    struct TestSink {
603        available: AtomicBool,
604        received: Mutex<Vec<Vec<u8>>>,
605    }
606
607    impl TestSink {
608        fn new() -> Self {
609            Self {
610                available: AtomicBool::new(true),
611                received: Mutex::new(Vec::new()),
612            }
613        }
614
615        fn set_available(&self, available: bool) {
616            self.available.store(available, AtomicOrdering::SeqCst);
617        }
618
619        async fn received_count(&self) -> usize {
620            self.received.lock().await.len()
621        }
622    }
623
624    impl Sink for TestSink {
625        type Error = TestError;
626
627        async fn try_send(&self, data: &[u8]) -> std::result::Result<(), SinkError<Self::Error>> {
628            if self.available.load(AtomicOrdering::SeqCst) {
629                self.received.lock().await.push(data.to_vec());
630                Ok(())
631            } else {
632                Err(SinkError::Unavailable)
633            }
634        }
635    }
636
637    #[tokio::test]
638    async fn test_hot_path_when_available() {
639        let dir = tempdir().unwrap();
640        let spool_path = dir.path().join("test-queue");
641
642        let sink = TestSink::new();
643        let config = TieredSinkConfig::new(&spool_path);
644
645        let tiered = TieredSink::new(sink, config).await.unwrap();
646
647        tiered.send(b"hello").await.unwrap();
648
649        assert_eq!(tiered.hot_path_count(), 1);
650        assert_eq!(tiered.cold_path_count(), 0);
651        assert!(tiered.spool_is_empty().await);
652        assert_eq!(tiered.inner().received_count().await, 1);
653
654        tiered.shutdown().await;
655    }
656
657    #[tokio::test]
658    async fn test_cold_path_when_unavailable() {
659        let dir = tempdir().unwrap();
660        let spool_path = dir.path().join("test-queue");
661
662        let sink = TestSink::new();
663        sink.set_available(false);
664
665        let mut config = TieredSinkConfig::new(&spool_path);
666        config.circuit_failure_threshold = 1; // Open circuit quickly
667
668        let tiered = TieredSink::new(sink, config).await.unwrap();
669
670        // First message triggers circuit open
671        tiered.send(b"hello").await.unwrap();
672
673        // Should have spooled
674        assert_eq!(tiered.cold_path_count(), 1);
675        assert!(!tiered.spool_is_empty().await);
676        assert_eq!(tiered.inner().received_count().await, 0);
677
678        tiered.shutdown().await;
679    }
680
681    #[tokio::test]
682    async fn test_circuit_opens_after_failures() {
683        let dir = tempdir().unwrap();
684        let spool_path = dir.path().join("test-queue");
685
686        let sink = TestSink::new();
687        sink.set_available(false);
688
689        let mut config = TieredSinkConfig::new(&spool_path);
690        config.circuit_failure_threshold = 3;
691
692        let tiered = TieredSink::new(sink, config).await.unwrap();
693
694        // First two fail but circuit stays closed
695        tiered.send(b"1").await.unwrap();
696        tiered.send(b"2").await.unwrap();
697        assert_eq!(tiered.circuit_state().await, CircuitState::Closed);
698
699        // Third failure opens circuit
700        tiered.send(b"3").await.unwrap();
701        assert_eq!(tiered.circuit_state().await, CircuitState::Open);
702
703        tiered.shutdown().await;
704    }
705
706    #[tokio::test]
707    async fn test_drain_recovers_messages() {
708        let dir = tempdir().unwrap();
709        let spool_path = dir.path().join("test-queue");
710
711        let sink = TestSink::new();
712        sink.set_available(false);
713
714        let mut config = TieredSinkConfig::new(&spool_path);
715        config.circuit_failure_threshold = 1;
716        config.circuit_reset_timeout_ms = 50;
717        config.drain_interval_ms = 10;
718
719        let tiered = TieredSink::new(sink, config).await.unwrap();
720
721        // Spool a message
722        tiered.send(b"recover me").await.unwrap();
723        assert_eq!(tiered.spool_len().await, 1);
724
725        // Make sink available and wait for drain
726        tiered.inner().set_available(true);
727        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
728
729        // Should have drained
730        assert!(tiered.spool_is_empty().await);
731        assert_eq!(tiered.inner().received_count().await, 1);
732
733        tiered.shutdown().await;
734    }
735
736    #[tokio::test]
737    async fn test_strict_fifo_waits_for_drain() {
738        let dir = tempdir().unwrap();
739        let spool_path = dir.path().join("test-queue");
740
741        let sink = TestSink::new();
742        sink.set_available(false);
743
744        let mut config = TieredSinkConfig::new(&spool_path);
745        config.ordering = OrderingMode::StrictFifo;
746        config.circuit_failure_threshold = 1;
747
748        let tiered = TieredSink::new(sink, config).await.unwrap();
749
750        // First message gets spooled due to unavailable sink
751        tiered.send(b"first message").await.unwrap();
752        assert_eq!(tiered.spool_len().await, 1);
753
754        // Make sink available again
755        tiered.inner().set_available(true);
756        tiered.reset_circuit().await;
757
758        // In StrictFifo mode, new messages should spool while spool is non-empty
759        tiered.send(b"new message").await.unwrap();
760
761        // Should still be 2 messages in spool (strict FIFO queues new messages behind old)
762        assert_eq!(tiered.spool_len().await, 2);
763        assert_eq!(tiered.cold_path_count(), 2);
764
765        tiered.shutdown().await;
766    }
767
768    #[tokio::test]
769    async fn test_max_spool_bytes_enforced() {
770        let dir = tempdir().unwrap();
771        let spool_path = dir.path().join("test-bytes-limit");
772
773        let sink = TestSink::new();
774        sink.set_available(false);
775
776        let mut config = TieredSinkConfig::new(&spool_path);
777        config.circuit_failure_threshold = 1;
778        // Set a very small byte limit
779        config.max_spool_bytes = Some(50);
780
781        let tiered = TieredSink::new(sink, config).await.unwrap();
782
783        // First message should spool (compressed size fits)
784        tiered.send(b"small").await.unwrap();
785        assert_eq!(tiered.cold_path_count(), 1);
786        assert!(tiered.spool_bytes() > 0);
787
788        // Keep sending until we hit the limit
789        let mut hit_limit = false;
790        for _ in 0..100 {
791            match tiered.send(b"more data here").await {
792                Ok(()) => {}
793                Err(TieredSinkError::SpoolFull(msg)) => {
794                    assert!(msg.contains("max spool bytes"));
795                    hit_limit = true;
796                    break;
797                }
798                Err(e) => panic!("unexpected error: {e}"),
799            }
800        }
801        assert!(hit_limit, "should have hit spool byte limit");
802
803        tiered.shutdown().await;
804    }
805
806    #[tokio::test]
807    async fn test_spool_bytes_decremented_on_drain() {
808        let dir = tempdir().unwrap();
809        let spool_path = dir.path().join("test-bytes-drain");
810
811        let sink = TestSink::new();
812        sink.set_available(false);
813
814        let mut config = TieredSinkConfig::new(&spool_path);
815        config.circuit_failure_threshold = 1;
816        config.circuit_reset_timeout_ms = 50;
817        config.drain_interval_ms = 10;
818
819        let tiered = TieredSink::new(sink, config).await.unwrap();
820
821        // Spool some messages
822        tiered.send(b"drain me").await.unwrap();
823        tiered.send(b"drain me too").await.unwrap();
824        let bytes_after_spool = tiered.spool_bytes();
825        assert!(bytes_after_spool > 0);
826
827        // Make sink available and wait for drain
828        tiered.inner().set_available(true);
829        tokio::time::sleep(std::time::Duration::from_millis(300)).await;
830
831        // Bytes should be decremented
832        assert_eq!(tiered.spool_bytes(), 0);
833        assert!(tiered.spool_is_empty().await);
834
835        tiered.shutdown().await;
836    }
837
838    #[tokio::test]
839    async fn test_spool_count_initialised_from_existing_queue() {
840        let dir = tempdir().unwrap();
841        let spool_path = dir.path().join("test-init-count");
842
843        // Phase 1: Create a TieredSink, spool messages, then drop
844        {
845            let sink = TestSink::new();
846            sink.set_available(false);
847
848            let mut config = TieredSinkConfig::new(&spool_path);
849            config.circuit_failure_threshold = 1;
850
851            let tiered = TieredSink::new(sink, config).await.unwrap();
852
853            tiered.send(b"message 1").await.unwrap();
854            tiered.send(b"message 2").await.unwrap();
855            tiered.send(b"message 3").await.unwrap();
856            assert_eq!(tiered.spool_len().await, 3);
857
858            tiered.shutdown().await;
859        }
860
861        // Phase 2: Re-open -- spool_count should reflect existing items
862        {
863            let sink = TestSink::new();
864            let config = TieredSinkConfig::new(&spool_path);
865            let tiered = TieredSink::new(sink, config).await.unwrap();
866
867            assert_eq!(tiered.spool_len().await, 3);
868            assert!(tiered.spool_bytes() > 0);
869
870            tiered.shutdown().await;
871        }
872    }
873
874    #[tokio::test]
875    async fn test_disk_available_flag() {
876        let dir = tempdir().unwrap();
877        let spool_path = dir.path().join("test-disk-flag");
878
879        let sink = TestSink::new();
880        sink.set_available(false);
881
882        let mut config = TieredSinkConfig::new(&spool_path);
883        config.circuit_failure_threshold = 1;
884
885        let tiered = TieredSink::new(sink, config).await.unwrap();
886
887        // By default, disk should be available
888        assert!(tiered.is_disk_available());
889
890        // Manually set flag to false to simulate full disk
891        tiered.disk_available.store(false, AtomicOrdering::Relaxed);
892
893        let result = tiered.send(b"should fail").await;
894        assert!(matches!(result, Err(TieredSinkError::DiskUnavailable)));
895
896        tiered.shutdown().await;
897    }
898
899    /// C15 regression: N concurrent senders against a small cap.
900    /// Pre-fix overshot; atomic reservation keeps total exact.
901    #[tokio::test]
902    async fn test_max_spool_items_not_overshooting_under_concurrency() {
903        let dir = tempdir().unwrap();
904        let spool_path = dir.path().join("test-toctou-items");
905
906        let sink = TestSink::new();
907        sink.set_available(false);
908
909        let mut config = TieredSinkConfig::new(&spool_path);
910        config.circuit_failure_threshold = 1;
911        config.max_spool_items = Some(10);
912
913        let tiered = Arc::new(TieredSink::new(sink, config).await.unwrap());
914
915        // Fan out 100 concurrent senders against a cap of 10.
916        let mut joins = Vec::new();
917        for _ in 0..100 {
918            let t = Arc::clone(&tiered);
919            joins.push(tokio::spawn(async move { t.send(b"contention").await }));
920        }
921
922        let mut accepted = 0usize;
923        let mut rejected = 0usize;
924        for j in joins {
925            match j.await.unwrap() {
926                Ok(()) => accepted += 1,
927                Err(TieredSinkError::SpoolFull(_)) => rejected += 1,
928                Err(e) => panic!("unexpected error: {e}"),
929            }
930        }
931
932        assert_eq!(accepted, 10, "cap must not overshoot (got {accepted})");
933        assert_eq!(rejected, 90);
934        assert_eq!(tiered.spool_len().await, 10);
935
936        let tiered = Arc::try_unwrap(tiered)
937            .map_err(|_| "outstanding Arc refs")
938            .unwrap();
939        tiered.shutdown().await;
940    }
941}