Skip to main content

hyperi_rustlib/tiered_sink/
tiered.rs

1// Project:   hyperi-rustlib
2// File:      src/tiered_sink/tiered.rs
3// Purpose:   TieredSink implementation
4// Language:  Rust
5//
6// License:   FSL-1.1-ALv2
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! TieredSink implementation.
10
11use crate::tiered_sink::{
12    CircuitBreaker, CircuitState, CompressionCodec, OrderingMode, Result, Sink, SinkError,
13    TieredSinkConfig, TieredSinkError, drainer,
14};
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
17use tokio::sync::{Mutex, Notify};
18use tokio::task::JoinHandle;
19use tokio::time::timeout;
20use yaque::{Receiver, Sender};
21
22/// A tiered sink with automatic disk spillover.
23///
24/// Wraps any `Sink` implementation and automatically spills messages to disk
25/// when the primary sink is unavailable or backpressuring. A background task
26/// drains spooled messages back to the primary when it recovers.
27pub struct TieredSink<S: Sink> {
28    sink: Arc<S>,
29    spool_sender: Arc<Mutex<Sender>>,
30    /// Receiver is owned by TieredSink but accessed via Arc clone by drainer task
31    #[allow(dead_code)]
32    spool_receiver: Arc<Mutex<Receiver>>,
33    spool_count: Arc<AtomicU64>,
34    spool_bytes: Arc<AtomicU64>,
35    circuit: Arc<CircuitBreaker>,
36    codec: CompressionCodec,
37    config: TieredSinkConfig,
38    shutdown: Arc<Notify>,
39    drain_handle: Option<JoinHandle<()>>,
40    disk_available: Arc<std::sync::atomic::AtomicBool>,
41    #[allow(dead_code)]
42    disk_poller_handle: Option<JoinHandle<()>>,
43
44    /// Serialises send + drain in `StrictFifo`. `None` otherwise.
45    fifo_gate: Option<Arc<Mutex<()>>>,
46
47    // Metrics
48    hot_path_count: AtomicU64,
49    cold_path_count: AtomicU64,
50}
51
52impl<S: Sink> TieredSink<S> {
53    /// Create a new TieredSink wrapping the given sink.
54    ///
55    /// # Errors
56    ///
57    /// Returns an error if the spool file cannot be opened.
58    pub async fn new(sink: S, config: TieredSinkConfig) -> Result<Self> {
59        let (sender, receiver) =
60            yaque::channel(&config.spool_path).map_err(|e| TieredSinkError::SpoolOpen {
61                path: config.spool_path.display().to_string(),
62                message: e.to_string(),
63            })?;
64
65        let sink = Arc::new(sink);
66        let spool_sender = Arc::new(Mutex::new(sender));
67        let spool_receiver = Arc::new(Mutex::new(receiver));
68
69        // Codex F16: recover counters in spawn_blocking -- segment
70        // files can be GB-sized after a crash; walking them on the
71        // async runtime pins a tokio worker.
72        let spool_path_for_scan = config.spool_path.clone();
73        let (initial_count, initial_bytes) =
74            tokio::task::spawn_blocking(move || spool_item_count_and_bytes(&spool_path_for_scan))
75                .await
76                .unwrap_or((0, 0));
77        let spool_count = Arc::new(AtomicU64::new(initial_count));
78        let spool_bytes = Arc::new(AtomicU64::new(initial_bytes));
79
80        let circuit = Arc::new(CircuitBreaker::new(
81            config.circuit_failure_threshold,
82            config.circuit_reset_timeout(),
83        ));
84        let shutdown = Arc::new(Notify::new());
85        let codec = config.compression;
86        let disk_available = Arc::new(std::sync::atomic::AtomicBool::new(true));
87
88        // Shared by senders + drainer in StrictFifo to enforce
89        // total ordering at the sink.
90        let fifo_gate =
91            matches!(config.ordering, OrderingMode::StrictFifo).then(|| Arc::new(Mutex::new(())));
92
93        // Start disk-aware capacity poller if configured
94        let disk_poller_handle = config.disk_aware.as_ref().map(|disk_cfg| {
95            let spool_path = config.spool_path.clone();
96            let disk_flag = Arc::clone(&disk_available);
97            let shutdown_clone = Arc::clone(&shutdown);
98            let poll_interval = std::time::Duration::from_secs(disk_cfg.poll_interval_secs);
99            let max_usage = disk_cfg.max_usage_percent;
100
101            tokio::spawn(disk_capacity_poller(
102                spool_path,
103                disk_flag,
104                max_usage,
105                poll_interval,
106                shutdown_clone,
107            ))
108        });
109
110        // Start drain task
111        let drain_handle = tokio::spawn(drainer::drain_loop(
112            Arc::clone(&sink),
113            Arc::clone(&spool_receiver),
114            Arc::clone(&spool_count),
115            Arc::clone(&spool_bytes),
116            Arc::clone(&circuit),
117            codec,
118            config.drain_strategy,
119            config.drain_interval(),
120            Arc::clone(&shutdown),
121            fifo_gate.as_ref().map(Arc::clone),
122        ));
123
124        Ok(Self {
125            sink,
126            spool_sender,
127            spool_receiver,
128            spool_count,
129            spool_bytes,
130            circuit,
131            codec,
132            config,
133            shutdown,
134            drain_handle: Some(drain_handle),
135            disk_available,
136            disk_poller_handle,
137            fifo_gate,
138            hot_path_count: AtomicU64::new(0),
139            cold_path_count: AtomicU64::new(0),
140        })
141    }
142
143    /// Send a message through the tiered sink.
144    ///
145    /// The message goes through the hot path (direct to sink) if:
146    /// - Circuit is closed AND ordering mode is Interleaved
147    /// - OR circuit is closed AND ordering is StrictFifo AND spool is empty
148    ///
149    /// Otherwise, the message is spooled to disk.
150    ///
151    /// # Errors
152    ///
153    /// Returns an error if:
154    /// - The sink returns a fatal error
155    /// - The spool is full
156    /// - Compression fails
157    pub async fn send(&self, data: &[u8]) -> Result<()> {
158        // StrictFifo: serialise decision + send + enqueue against
159        // other senders and the drainer. Interleaved: no gate.
160        let _gate = match &self.fifo_gate {
161            Some(gate) => Some(gate.lock().await),
162            None => None,
163        };
164
165        let use_hot_path = self.should_use_hot_path().await;
166
167        if use_hot_path {
168            match self.try_hot_path(data).await {
169                Ok(()) => {
170                    self.hot_path_count.fetch_add(1, AtomicOrdering::Relaxed);
171                    #[cfg(feature = "metrics")]
172                    ::metrics::counter!("dfe_spool_hot_path_total").increment(1);
173                    return Ok(());
174                }
175                Err(TieredSinkError::Sink(_)) => {
176                    // Fatal error, don't spool
177                    return Err(TieredSinkError::Sink("fatal sink error".into()));
178                }
179                Err(_) => {
180                    // Retryable error, fall through to spool
181                }
182            }
183        }
184
185        // Cold path: spool to disk
186        self.spool_message(data).await?;
187        self.cold_path_count.fetch_add(1, AtomicOrdering::Relaxed);
188        #[cfg(feature = "metrics")]
189        ::metrics::counter!("dfe_spool_cold_path_total").increment(1);
190        Ok(())
191    }
192
193    /// Determine if we should attempt the hot path.
194    async fn should_use_hot_path(&self) -> bool {
195        let circuit_state = self.circuit.state().await;
196
197        #[cfg(feature = "metrics")]
198        ::metrics::gauge!("dfe_spool_circuit_state").set(match circuit_state {
199            CircuitState::Closed => 0.0,
200            CircuitState::HalfOpen => 1.0,
201            CircuitState::Open => 2.0,
202        });
203
204        match circuit_state {
205            CircuitState::Open => false,
206            CircuitState::Closed | CircuitState::HalfOpen => match self.config.ordering {
207                OrderingMode::Interleaved => true,
208                OrderingMode::StrictFifo => {
209                    // Only use hot path if spool is empty
210                    self.spool_count.load(AtomicOrdering::Relaxed) == 0
211                }
212            },
213        }
214    }
215
216    /// Try to send via hot path (direct to sink).
217    async fn try_hot_path(&self, data: &[u8]) -> Result<()> {
218        let send_timeout = self.config.send_timeout_duration();
219
220        match timeout(send_timeout, self.sink.try_send(data)).await {
221            Ok(Ok(())) => {
222                self.circuit.record_success().await;
223                Ok(())
224            }
225            Ok(Err(SinkError::Full)) => {
226                // Backpressure, don't count as failure for circuit
227                Err(TieredSinkError::Spool("sink full".into()))
228            }
229            Ok(Err(SinkError::Unavailable)) => {
230                self.circuit.record_failure().await;
231                #[cfg(feature = "metrics")]
232                ::metrics::counter!("dfe_spool_circuit_trips_total").increment(1);
233                Err(TieredSinkError::Spool("sink unavailable".into()))
234            }
235            Ok(Err(SinkError::Fatal(e))) => {
236                // Fatal error - propagate, don't spool
237                Err(TieredSinkError::Sink(e.to_string()))
238            }
239            Err(_timeout) => {
240                self.circuit.record_failure().await;
241                #[cfg(feature = "metrics")]
242                ::metrics::counter!("dfe_spool_circuit_trips_total").increment(1);
243                Err(TieredSinkError::Spool("send timeout".into()))
244            }
245        }
246    }
247
248    /// Reserve capacity (`fetch_update`) before enqueue; roll back
249    /// on failure. Atomic reservation prevents two concurrent
250    /// callers from both passing the cap check and overshooting.
251    async fn spool_message(&self, data: &[u8]) -> Result<()> {
252        // Check disk availability first
253        if !self.disk_available.load(AtomicOrdering::Relaxed) {
254            return Err(TieredSinkError::DiskUnavailable);
255        }
256
257        let compressed = self.codec.compress(data)?;
258        let compressed_len = compressed.len() as u64;
259
260        // Reserve item slot.
261        if let Some(max_items) = self.config.max_spool_items {
262            let max_items_u64 = max_items as u64;
263            self.spool_count
264                .fetch_update(AtomicOrdering::AcqRel, AtomicOrdering::Acquire, |cur| {
265                    if cur < max_items_u64 {
266                        Some(cur + 1)
267                    } else {
268                        None
269                    }
270                })
271                .map_err(|_| {
272                    TieredSinkError::SpoolFull(format!("max items {max_items} reached"))
273                })?;
274        } else {
275            self.spool_count.fetch_add(1, AtomicOrdering::AcqRel);
276        }
277
278        // Reserve byte budget; roll back item slot on failure.
279        if let Some(max_bytes) = self.config.max_spool_bytes {
280            if let Err(current_bytes) = self.spool_bytes.fetch_update(
281                AtomicOrdering::AcqRel,
282                AtomicOrdering::Acquire,
283                |cur| {
284                    cur.checked_add(compressed_len)
285                        .filter(|new| *new <= max_bytes)
286                },
287            ) {
288                self.spool_count.fetch_sub(1, AtomicOrdering::AcqRel);
289                return Err(TieredSinkError::SpoolFull(format!(
290                    "max spool bytes {max_bytes} reached (current: {current_bytes}, \
291                     new message: {compressed_len})"
292                )));
293            }
294        } else {
295            self.spool_bytes
296                .fetch_add(compressed_len, AtomicOrdering::AcqRel);
297        }
298
299        // Enqueue; roll back both reservations on failure.
300        let mut sender = self.spool_sender.lock().await;
301        if let Err(e) = sender.send(compressed).await {
302            drop(sender);
303            self.spool_count.fetch_sub(1, AtomicOrdering::AcqRel);
304            self.spool_bytes
305                .fetch_sub(compressed_len, AtomicOrdering::AcqRel);
306            return Err(TieredSinkError::Spool(e.to_string()));
307        }
308        drop(sender);
309
310        #[cfg(feature = "metrics")]
311        {
312            ::metrics::gauge!("dfe_spool_messages")
313                .set(self.spool_count.load(AtomicOrdering::Relaxed) as f64);
314            ::metrics::gauge!("dfe_spool_bytes")
315                .set(self.spool_bytes.load(AtomicOrdering::Relaxed) as f64);
316        }
317
318        #[cfg(feature = "tracing")]
319        tracing::debug!(
320            spool_items = self.spool_count.load(AtomicOrdering::Relaxed),
321            spool_bytes = self.spool_bytes.load(AtomicOrdering::Relaxed),
322            "Message spooled to disk"
323        );
324
325        Ok(())
326    }
327
328    /// Get the number of messages currently in the spool.
329    #[allow(clippy::cast_possible_truncation)]
330    pub async fn spool_len(&self) -> usize {
331        self.spool_count.load(AtomicOrdering::Relaxed) as usize
332    }
333
334    /// Check if the spool is empty.
335    pub async fn spool_is_empty(&self) -> bool {
336        self.spool_count.load(AtomicOrdering::Relaxed) == 0
337    }
338
339    /// Get the approximate number of bytes currently in the spool.
340    #[must_use]
341    pub fn spool_bytes(&self) -> u64 {
342        self.spool_bytes.load(AtomicOrdering::Relaxed)
343    }
344
345    /// Check if disk is available for spooling.
346    #[must_use]
347    pub fn is_disk_available(&self) -> bool {
348        self.disk_available.load(AtomicOrdering::Relaxed)
349    }
350
351    /// Get the current circuit breaker state.
352    pub async fn circuit_state(&self) -> CircuitState {
353        self.circuit.state().await
354    }
355
356    /// Get hot path message count.
357    #[must_use]
358    pub fn hot_path_count(&self) -> u64 {
359        self.hot_path_count.load(AtomicOrdering::Relaxed)
360    }
361
362    /// Get cold path (spooled) message count.
363    #[must_use]
364    pub fn cold_path_count(&self) -> u64 {
365        self.cold_path_count.load(AtomicOrdering::Relaxed)
366    }
367
368    /// Get a reference to the underlying sink.
369    pub fn inner(&self) -> &S {
370        &self.sink
371    }
372
373    /// Manually reset the circuit breaker.
374    pub async fn reset_circuit(&self) {
375        self.circuit.reset().await;
376    }
377
378    /// Shutdown the drain task gracefully.
379    pub async fn shutdown(mut self) {
380        self.shutdown.notify_one();
381        if let Some(handle) = self.drain_handle.take() {
382            let _ = handle.await;
383        }
384    }
385}
386
387/// Count existing items and sum payload bytes in a yaque queue directory.
388///
389/// yaque stores messages as `[4-byte Hamming header][payload]` in segment files
390/// named `<n>.q`. The receiver position is persisted in `recv-metadata`.
391///
392/// Returns `(item_count, payload_bytes)`.
393fn spool_item_count_and_bytes(path: &std::path::Path) -> (u64, u64) {
394    if !path.is_dir() {
395        return (0, 0);
396    }
397
398    // Read receiver state from recv-metadata (two big-endian u64: segment, position)
399    let recv_metadata_path = path.join("recv-metadata");
400    let (recv_segment, recv_position) = if recv_metadata_path.exists() {
401        std::fs::read(&recv_metadata_path)
402            .ok()
403            .and_then(|data| {
404                if data.len() >= 16 {
405                    let segment = u64::from_be_bytes(data[0..8].try_into().ok()?);
406                    let position = u64::from_be_bytes(data[8..16].try_into().ok()?);
407                    Some((segment, position))
408                } else {
409                    None
410                }
411            })
412            .unwrap_or((0, 0))
413    } else {
414        (0, 0)
415    };
416
417    // Collect segment files at or after the receiver position
418    let mut segments: Vec<u64> = Vec::new();
419    if let Ok(entries) = std::fs::read_dir(path) {
420        for entry in entries.flatten() {
421            let file_path = entry.path();
422            if file_path.extension().and_then(|e| e.to_str()) == Some("q")
423                && let Some(seg_num) = file_path
424                    .file_stem()
425                    .and_then(|s| s.to_str())
426                    .and_then(|s| s.parse::<u64>().ok())
427                && seg_num >= recv_segment
428            {
429                segments.push(seg_num);
430            }
431        }
432    }
433    segments.sort_unstable();
434
435    let header_eof: [u8; 4] = [255, 255, 255, 255];
436    let mut count = 0u64;
437    let mut bytes = 0u64;
438
439    for &seg_num in &segments {
440        let seg_path = path.join(format!("{seg_num}.q"));
441        let Ok(file_data) = std::fs::read(&seg_path) else {
442            continue;
443        };
444
445        #[allow(clippy::cast_possible_truncation)]
446        let start = if seg_num == recv_segment {
447            recv_position as usize
448        } else {
449            0
450        };
451
452        let mut pos = start;
453        while pos + 4 <= file_data.len() {
454            let header_bytes: [u8; 4] = file_data[pos..pos + 4].try_into().unwrap_or([0; 4]);
455            if header_bytes == header_eof {
456                break;
457            }
458            let encoded = u32::from_be_bytes(header_bytes);
459            let payload_len = (encoded & 0x03_FF_FF_FF) as usize;
460            pos += 4 + payload_len;
461            if pos <= file_data.len() {
462                count += 1;
463                bytes += payload_len as u64;
464            }
465        }
466    }
467
468    (count, bytes)
469}
470
471/// Check available disk space using `statvfs`.
472///
473/// Returns `(total_bytes, available_bytes)` for the filesystem containing `path`.
474///
475/// # Safety
476///
477/// Calls `libc::statvfs` which is unsafe but well-defined when given a valid path.
478#[allow(unsafe_code)]
479fn check_disk_space(path: &std::path::Path) -> Option<(u64, u64)> {
480    use std::ffi::CString;
481    let c_path = CString::new(path.to_string_lossy().as_bytes()).ok()?;
482
483    // SAFETY: zeroed statvfs is a valid initialisation for the struct
484    // before passing to libc::statvfs which fills all fields.
485    // c_path is a valid null-terminated C string pointing to an existing
486    // filesystem path, and stat is a properly-sized statvfs struct.
487    #[allow(unsafe_code)]
488    let stat = unsafe {
489        let mut stat: libc::statvfs = std::mem::zeroed();
490        let result = libc::statvfs(c_path.as_ptr(), &raw mut stat);
491        if result != 0 {
492            return None;
493        }
494        stat
495    };
496
497    // Portability: libc::statvfs field widths differ across platforms.
498    // Linux: f_blocks/f_bavail/f_frsize are u64. aarch64-apple-darwin: u32.
499    // Cast on macOS to bridge the type difference; no-op on Linux. Fixes #39.
500    #[cfg(target_os = "macos")]
501    {
502        let block_size: u64 = u64::from(stat.f_frsize);
503        let total: u64 = u64::from(stat.f_blocks) * block_size;
504        let available: u64 = u64::from(stat.f_bavail) * block_size;
505        Some((total, available))
506    }
507    #[cfg(not(target_os = "macos"))]
508    {
509        let block_size = stat.f_frsize;
510        let total = stat.f_blocks * block_size;
511        let available = stat.f_bavail * block_size;
512        Some((total, available))
513    }
514}
515
516/// Background poller that checks disk usage and sets a flag.
517async fn disk_capacity_poller(
518    spool_path: std::path::PathBuf,
519    disk_available: Arc<std::sync::atomic::AtomicBool>,
520    max_usage_percent: f64,
521    poll_interval: std::time::Duration,
522    shutdown: Arc<Notify>,
523) {
524    loop {
525        tokio::select! {
526            () = shutdown.notified() => {
527                #[cfg(feature = "tracing")]
528                tracing::debug!("Disk capacity poller shutting down");
529                return;
530            }
531            () = tokio::time::sleep(poll_interval) => {}
532        }
533
534        let disk_space = check_disk_space(&spool_path);
535
536        #[cfg(feature = "metrics")]
537        if let Some((total, avail)) = disk_space {
538            ::metrics::gauge!("dfe_spool_disk_available_bytes").set(avail as f64);
539            ::metrics::gauge!("dfe_spool_disk_total_bytes").set(total as f64);
540        }
541
542        let available = disk_space.is_none_or(|(total, avail)| {
543            if total == 0 {
544                return true;
545            }
546            let used_ratio = 1.0 - (avail as f64 / total as f64);
547            let ok = used_ratio < max_usage_percent;
548            #[cfg(feature = "tracing")]
549            if !ok {
550                tracing::warn!(
551                    used_percent = format!("{:.1}%", used_ratio * 100.0),
552                    threshold = format!("{:.1}%", max_usage_percent * 100.0),
553                    "Disk usage exceeds threshold, pausing spool writes"
554                );
555            }
556            ok
557        });
558
559        disk_available.store(available, std::sync::atomic::Ordering::Relaxed);
560    }
561}
562
563impl<S: Sink> Drop for TieredSink<S> {
564    fn drop(&mut self) {
565        // Codex F17: durability requires explicit `shutdown().await`.
566        // Drop can only notify the background drainer -- it can't
567        // await it. If anything's still spooled at drop time, the
568        // caller has skipped the explicit shutdown and risks losing
569        // that data. Warn + count so this shows up in dashboards.
570        let pending = self.spool_count.load(AtomicOrdering::Relaxed);
571        if pending > 0 {
572            #[cfg(feature = "metrics")]
573            ::metrics::counter!("dfe_spool_dropped_without_shutdown_total").increment(1);
574            #[cfg(feature = "tracing")]
575            tracing::warn!(
576                pending,
577                "TieredSink dropped with spooled work pending -- call shutdown().await for durability"
578            );
579        }
580        self.shutdown.notify_one();
581    }
582}
583
584#[cfg(test)]
585mod tests {
586    use super::*;
587    use std::sync::atomic::AtomicBool;
588    use tempfile::tempdir;
589
590    #[derive(Debug)]
591    struct TestError(String);
592
593    impl std::fmt::Display for TestError {
594        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
595            write!(f, "{}", self.0)
596        }
597    }
598
599    impl std::error::Error for TestError {}
600
601    struct TestSink {
602        available: AtomicBool,
603        received: Mutex<Vec<Vec<u8>>>,
604    }
605
606    impl TestSink {
607        fn new() -> Self {
608            Self {
609                available: AtomicBool::new(true),
610                received: Mutex::new(Vec::new()),
611            }
612        }
613
614        fn set_available(&self, available: bool) {
615            self.available.store(available, AtomicOrdering::SeqCst);
616        }
617
618        async fn received_count(&self) -> usize {
619            self.received.lock().await.len()
620        }
621    }
622
623    impl Sink for TestSink {
624        type Error = TestError;
625
626        async fn try_send(&self, data: &[u8]) -> std::result::Result<(), SinkError<Self::Error>> {
627            if self.available.load(AtomicOrdering::SeqCst) {
628                self.received.lock().await.push(data.to_vec());
629                Ok(())
630            } else {
631                Err(SinkError::Unavailable)
632            }
633        }
634    }
635
636    #[tokio::test]
637    async fn test_hot_path_when_available() {
638        let dir = tempdir().unwrap();
639        let spool_path = dir.path().join("test-queue");
640
641        let sink = TestSink::new();
642        let config = TieredSinkConfig::new(&spool_path);
643
644        let tiered = TieredSink::new(sink, config).await.unwrap();
645
646        tiered.send(b"hello").await.unwrap();
647
648        assert_eq!(tiered.hot_path_count(), 1);
649        assert_eq!(tiered.cold_path_count(), 0);
650        assert!(tiered.spool_is_empty().await);
651        assert_eq!(tiered.inner().received_count().await, 1);
652
653        tiered.shutdown().await;
654    }
655
656    #[tokio::test]
657    async fn test_cold_path_when_unavailable() {
658        let dir = tempdir().unwrap();
659        let spool_path = dir.path().join("test-queue");
660
661        let sink = TestSink::new();
662        sink.set_available(false);
663
664        let mut config = TieredSinkConfig::new(&spool_path);
665        config.circuit_failure_threshold = 1; // Open circuit quickly
666
667        let tiered = TieredSink::new(sink, config).await.unwrap();
668
669        // First message triggers circuit open
670        tiered.send(b"hello").await.unwrap();
671
672        // Should have spooled
673        assert_eq!(tiered.cold_path_count(), 1);
674        assert!(!tiered.spool_is_empty().await);
675        assert_eq!(tiered.inner().received_count().await, 0);
676
677        tiered.shutdown().await;
678    }
679
680    #[tokio::test]
681    async fn test_circuit_opens_after_failures() {
682        let dir = tempdir().unwrap();
683        let spool_path = dir.path().join("test-queue");
684
685        let sink = TestSink::new();
686        sink.set_available(false);
687
688        let mut config = TieredSinkConfig::new(&spool_path);
689        config.circuit_failure_threshold = 3;
690
691        let tiered = TieredSink::new(sink, config).await.unwrap();
692
693        // First two fail but circuit stays closed
694        tiered.send(b"1").await.unwrap();
695        tiered.send(b"2").await.unwrap();
696        assert_eq!(tiered.circuit_state().await, CircuitState::Closed);
697
698        // Third failure opens circuit
699        tiered.send(b"3").await.unwrap();
700        assert_eq!(tiered.circuit_state().await, CircuitState::Open);
701
702        tiered.shutdown().await;
703    }
704
705    #[tokio::test]
706    async fn test_drain_recovers_messages() {
707        let dir = tempdir().unwrap();
708        let spool_path = dir.path().join("test-queue");
709
710        let sink = TestSink::new();
711        sink.set_available(false);
712
713        let mut config = TieredSinkConfig::new(&spool_path);
714        config.circuit_failure_threshold = 1;
715        config.circuit_reset_timeout_ms = 50;
716        config.drain_interval_ms = 10;
717
718        let tiered = TieredSink::new(sink, config).await.unwrap();
719
720        // Spool a message
721        tiered.send(b"recover me").await.unwrap();
722        assert_eq!(tiered.spool_len().await, 1);
723
724        // Make sink available and wait for drain
725        tiered.inner().set_available(true);
726        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
727
728        // Should have drained
729        assert!(tiered.spool_is_empty().await);
730        assert_eq!(tiered.inner().received_count().await, 1);
731
732        tiered.shutdown().await;
733    }
734
735    #[tokio::test]
736    async fn test_strict_fifo_waits_for_drain() {
737        let dir = tempdir().unwrap();
738        let spool_path = dir.path().join("test-queue");
739
740        let sink = TestSink::new();
741        sink.set_available(false);
742
743        let mut config = TieredSinkConfig::new(&spool_path);
744        config.ordering = OrderingMode::StrictFifo;
745        config.circuit_failure_threshold = 1;
746
747        let tiered = TieredSink::new(sink, config).await.unwrap();
748
749        // First message gets spooled due to unavailable sink
750        tiered.send(b"first message").await.unwrap();
751        assert_eq!(tiered.spool_len().await, 1);
752
753        // Make sink available again
754        tiered.inner().set_available(true);
755        tiered.reset_circuit().await;
756
757        // In StrictFifo mode, new messages should spool while spool is non-empty
758        tiered.send(b"new message").await.unwrap();
759
760        // Should still be 2 messages in spool (strict FIFO queues new messages behind old)
761        assert_eq!(tiered.spool_len().await, 2);
762        assert_eq!(tiered.cold_path_count(), 2);
763
764        tiered.shutdown().await;
765    }
766
767    #[tokio::test]
768    async fn test_max_spool_bytes_enforced() {
769        let dir = tempdir().unwrap();
770        let spool_path = dir.path().join("test-bytes-limit");
771
772        let sink = TestSink::new();
773        sink.set_available(false);
774
775        let mut config = TieredSinkConfig::new(&spool_path);
776        config.circuit_failure_threshold = 1;
777        // Set a very small byte limit
778        config.max_spool_bytes = Some(50);
779
780        let tiered = TieredSink::new(sink, config).await.unwrap();
781
782        // First message should spool (compressed size fits)
783        tiered.send(b"small").await.unwrap();
784        assert_eq!(tiered.cold_path_count(), 1);
785        assert!(tiered.spool_bytes() > 0);
786
787        // Keep sending until we hit the limit
788        let mut hit_limit = false;
789        for _ in 0..100 {
790            match tiered.send(b"more data here").await {
791                Ok(()) => {}
792                Err(TieredSinkError::SpoolFull(msg)) => {
793                    assert!(msg.contains("max spool bytes"));
794                    hit_limit = true;
795                    break;
796                }
797                Err(e) => panic!("unexpected error: {e}"),
798            }
799        }
800        assert!(hit_limit, "should have hit spool byte limit");
801
802        tiered.shutdown().await;
803    }
804
805    #[tokio::test]
806    async fn test_spool_bytes_decremented_on_drain() {
807        let dir = tempdir().unwrap();
808        let spool_path = dir.path().join("test-bytes-drain");
809
810        let sink = TestSink::new();
811        sink.set_available(false);
812
813        let mut config = TieredSinkConfig::new(&spool_path);
814        config.circuit_failure_threshold = 1;
815        config.circuit_reset_timeout_ms = 50;
816        config.drain_interval_ms = 10;
817
818        let tiered = TieredSink::new(sink, config).await.unwrap();
819
820        // Spool some messages
821        tiered.send(b"drain me").await.unwrap();
822        tiered.send(b"drain me too").await.unwrap();
823        let bytes_after_spool = tiered.spool_bytes();
824        assert!(bytes_after_spool > 0);
825
826        // Make sink available and wait for drain
827        tiered.inner().set_available(true);
828        tokio::time::sleep(std::time::Duration::from_millis(300)).await;
829
830        // Bytes should be decremented
831        assert_eq!(tiered.spool_bytes(), 0);
832        assert!(tiered.spool_is_empty().await);
833
834        tiered.shutdown().await;
835    }
836
837    #[tokio::test]
838    async fn test_spool_count_initialised_from_existing_queue() {
839        let dir = tempdir().unwrap();
840        let spool_path = dir.path().join("test-init-count");
841
842        // Phase 1: Create a TieredSink, spool messages, then drop
843        {
844            let sink = TestSink::new();
845            sink.set_available(false);
846
847            let mut config = TieredSinkConfig::new(&spool_path);
848            config.circuit_failure_threshold = 1;
849
850            let tiered = TieredSink::new(sink, config).await.unwrap();
851
852            tiered.send(b"message 1").await.unwrap();
853            tiered.send(b"message 2").await.unwrap();
854            tiered.send(b"message 3").await.unwrap();
855            assert_eq!(tiered.spool_len().await, 3);
856
857            tiered.shutdown().await;
858        }
859
860        // Phase 2: Re-open -- spool_count should reflect existing items
861        {
862            let sink = TestSink::new();
863            let config = TieredSinkConfig::new(&spool_path);
864            let tiered = TieredSink::new(sink, config).await.unwrap();
865
866            assert_eq!(tiered.spool_len().await, 3);
867            assert!(tiered.spool_bytes() > 0);
868
869            tiered.shutdown().await;
870        }
871    }
872
873    #[tokio::test]
874    async fn test_disk_available_flag() {
875        let dir = tempdir().unwrap();
876        let spool_path = dir.path().join("test-disk-flag");
877
878        let sink = TestSink::new();
879        sink.set_available(false);
880
881        let mut config = TieredSinkConfig::new(&spool_path);
882        config.circuit_failure_threshold = 1;
883
884        let tiered = TieredSink::new(sink, config).await.unwrap();
885
886        // By default, disk should be available
887        assert!(tiered.is_disk_available());
888
889        // Manually set flag to false to simulate full disk
890        tiered.disk_available.store(false, AtomicOrdering::Relaxed);
891
892        let result = tiered.send(b"should fail").await;
893        assert!(matches!(result, Err(TieredSinkError::DiskUnavailable)));
894
895        tiered.shutdown().await;
896    }
897
898    /// C15 regression: N concurrent senders against a small cap.
899    /// Pre-fix overshot; atomic reservation keeps total exact.
900    #[tokio::test]
901    async fn test_max_spool_items_not_overshooting_under_concurrency() {
902        let dir = tempdir().unwrap();
903        let spool_path = dir.path().join("test-toctou-items");
904
905        let sink = TestSink::new();
906        sink.set_available(false);
907
908        let mut config = TieredSinkConfig::new(&spool_path);
909        config.circuit_failure_threshold = 1;
910        config.max_spool_items = Some(10);
911
912        let tiered = Arc::new(TieredSink::new(sink, config).await.unwrap());
913
914        // Fan out 100 concurrent senders against a cap of 10.
915        let mut joins = Vec::new();
916        for _ in 0..100 {
917            let t = Arc::clone(&tiered);
918            joins.push(tokio::spawn(async move { t.send(b"contention").await }));
919        }
920
921        let mut accepted = 0usize;
922        let mut rejected = 0usize;
923        for j in joins {
924            match j.await.unwrap() {
925                Ok(()) => accepted += 1,
926                Err(TieredSinkError::SpoolFull(_)) => rejected += 1,
927                Err(e) => panic!("unexpected error: {e}"),
928            }
929        }
930
931        assert_eq!(accepted, 10, "cap must not overshoot (got {accepted})");
932        assert_eq!(rejected, 90);
933        assert_eq!(tiered.spool_len().await, 10);
934
935        let tiered = Arc::try_unwrap(tiered)
936            .map_err(|_| "outstanding Arc refs")
937            .unwrap();
938        tiered.shutdown().await;
939    }
940}