1use crate::tiered_sink::{
12 CircuitBreaker, CircuitState, CompressionCodec, OrderingMode, Result, Sink, SinkError,
13 TieredSinkConfig, TieredSinkError, drainer,
14};
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
17use tokio::sync::{Mutex, Notify};
18use tokio::task::JoinHandle;
19use tokio::time::timeout;
20use yaque::{Receiver, Sender};
21
22pub struct TieredSink<S: Sink> {
28 sink: Arc<S>,
29 spool_sender: Arc<Mutex<Sender>>,
30 #[allow(dead_code)]
32 spool_receiver: Arc<Mutex<Receiver>>,
33 spool_count: Arc<AtomicU64>,
34 spool_bytes: Arc<AtomicU64>,
35 circuit: Arc<CircuitBreaker>,
36 codec: CompressionCodec,
37 config: TieredSinkConfig,
38 shutdown: Arc<Notify>,
39 drain_handle: Option<JoinHandle<()>>,
40 disk_available: Arc<std::sync::atomic::AtomicBool>,
41 #[allow(dead_code)]
42 disk_poller_handle: Option<JoinHandle<()>>,
43
44 fifo_gate: Option<Arc<Mutex<()>>>,
46
47 hot_path_count: AtomicU64,
49 cold_path_count: AtomicU64,
50}
51
52impl<S: Sink> TieredSink<S> {
53 pub async fn new(sink: S, config: TieredSinkConfig) -> Result<Self> {
59 let (sender, receiver) =
60 yaque::channel(&config.spool_path).map_err(|e| TieredSinkError::SpoolOpen {
61 path: config.spool_path.display().to_string(),
62 message: e.to_string(),
63 })?;
64
65 let sink = Arc::new(sink);
66 let spool_sender = Arc::new(Mutex::new(sender));
67 let spool_receiver = Arc::new(Mutex::new(receiver));
68
69 let spool_path_for_scan = config.spool_path.clone();
73 let (initial_count, initial_bytes) =
74 tokio::task::spawn_blocking(move || spool_item_count_and_bytes(&spool_path_for_scan))
75 .await
76 .unwrap_or((0, 0));
77 let spool_count = Arc::new(AtomicU64::new(initial_count));
78 let spool_bytes = Arc::new(AtomicU64::new(initial_bytes));
79
80 let circuit = Arc::new(CircuitBreaker::new(
81 config.circuit_failure_threshold,
82 config.circuit_reset_timeout(),
83 ));
84 let shutdown = Arc::new(Notify::new());
85 let codec = config.compression;
86 let disk_available = Arc::new(std::sync::atomic::AtomicBool::new(true));
87
88 let fifo_gate =
91 matches!(config.ordering, OrderingMode::StrictFifo).then(|| Arc::new(Mutex::new(())));
92
93 let disk_poller_handle = config.disk_aware.as_ref().map(|disk_cfg| {
95 let spool_path = config.spool_path.clone();
96 let disk_flag = Arc::clone(&disk_available);
97 let shutdown_clone = Arc::clone(&shutdown);
98 let poll_interval = std::time::Duration::from_secs(disk_cfg.poll_interval_secs);
99 let max_usage = disk_cfg.max_usage_percent;
100
101 tokio::spawn(disk_capacity_poller(
102 spool_path,
103 disk_flag,
104 max_usage,
105 poll_interval,
106 shutdown_clone,
107 ))
108 });
109
110 let drain_handle = tokio::spawn(drainer::drain_loop(
112 Arc::clone(&sink),
113 Arc::clone(&spool_receiver),
114 Arc::clone(&spool_count),
115 Arc::clone(&spool_bytes),
116 Arc::clone(&circuit),
117 codec,
118 config.drain_strategy,
119 config.drain_interval(),
120 Arc::clone(&shutdown),
121 fifo_gate.as_ref().map(Arc::clone),
122 ));
123
124 Ok(Self {
125 sink,
126 spool_sender,
127 spool_receiver,
128 spool_count,
129 spool_bytes,
130 circuit,
131 codec,
132 config,
133 shutdown,
134 drain_handle: Some(drain_handle),
135 disk_available,
136 disk_poller_handle,
137 fifo_gate,
138 hot_path_count: AtomicU64::new(0),
139 cold_path_count: AtomicU64::new(0),
140 })
141 }
142
143 pub async fn send(&self, data: &[u8]) -> Result<()> {
158 let _gate = match &self.fifo_gate {
161 Some(gate) => Some(gate.lock().await),
162 None => None,
163 };
164
165 let use_hot_path = self.should_use_hot_path().await;
166
167 if use_hot_path {
168 match self.try_hot_path(data).await {
169 Ok(()) => {
170 self.hot_path_count.fetch_add(1, AtomicOrdering::Relaxed);
171 #[cfg(feature = "metrics")]
172 ::metrics::counter!("dfe_spool_hot_path_total").increment(1);
173 return Ok(());
174 }
175 Err(TieredSinkError::Sink(_)) => {
176 return Err(TieredSinkError::Sink("fatal sink error".into()));
178 }
179 Err(_) => {
180 }
182 }
183 }
184
185 self.spool_message(data).await?;
187 self.cold_path_count.fetch_add(1, AtomicOrdering::Relaxed);
188 #[cfg(feature = "metrics")]
189 ::metrics::counter!("dfe_spool_cold_path_total").increment(1);
190 Ok(())
191 }
192
193 async fn should_use_hot_path(&self) -> bool {
195 let circuit_state = self.circuit.state().await;
196
197 #[cfg(feature = "metrics")]
198 ::metrics::gauge!("dfe_spool_circuit_state").set(match circuit_state {
199 CircuitState::Closed => 0.0,
200 CircuitState::HalfOpen => 1.0,
201 CircuitState::Open => 2.0,
202 });
203
204 match circuit_state {
205 CircuitState::Open => false,
206 CircuitState::Closed | CircuitState::HalfOpen => match self.config.ordering {
207 OrderingMode::Interleaved => true,
208 OrderingMode::StrictFifo => {
209 self.spool_count.load(AtomicOrdering::Relaxed) == 0
211 }
212 },
213 }
214 }
215
216 async fn try_hot_path(&self, data: &[u8]) -> Result<()> {
218 let send_timeout = self.config.send_timeout_duration();
219
220 match timeout(send_timeout, self.sink.try_send(data)).await {
221 Ok(Ok(())) => {
222 self.circuit.record_success().await;
223 Ok(())
224 }
225 Ok(Err(SinkError::Full)) => {
226 Err(TieredSinkError::Spool("sink full".into()))
228 }
229 Ok(Err(SinkError::Unavailable)) => {
230 self.circuit.record_failure().await;
231 #[cfg(feature = "metrics")]
232 ::metrics::counter!("dfe_spool_circuit_trips_total").increment(1);
233 Err(TieredSinkError::Spool("sink unavailable".into()))
234 }
235 Ok(Err(SinkError::Fatal(e))) => {
236 Err(TieredSinkError::Sink(e.to_string()))
238 }
239 Err(_timeout) => {
240 self.circuit.record_failure().await;
241 #[cfg(feature = "metrics")]
242 ::metrics::counter!("dfe_spool_circuit_trips_total").increment(1);
243 Err(TieredSinkError::Spool("send timeout".into()))
244 }
245 }
246 }
247
248 async fn spool_message(&self, data: &[u8]) -> Result<()> {
252 if !self.disk_available.load(AtomicOrdering::Relaxed) {
254 return Err(TieredSinkError::DiskUnavailable);
255 }
256
257 let compressed = self.codec.compress(data)?;
258 let compressed_len = compressed.len() as u64;
259
260 if let Some(max_items) = self.config.max_spool_items {
262 let max_items_u64 = max_items as u64;
263 self.spool_count
264 .fetch_update(AtomicOrdering::AcqRel, AtomicOrdering::Acquire, |cur| {
265 if cur < max_items_u64 {
266 Some(cur + 1)
267 } else {
268 None
269 }
270 })
271 .map_err(|_| {
272 TieredSinkError::SpoolFull(format!("max items {max_items} reached"))
273 })?;
274 } else {
275 self.spool_count.fetch_add(1, AtomicOrdering::AcqRel);
276 }
277
278 if let Some(max_bytes) = self.config.max_spool_bytes {
280 if let Err(current_bytes) = self.spool_bytes.fetch_update(
281 AtomicOrdering::AcqRel,
282 AtomicOrdering::Acquire,
283 |cur| {
284 cur.checked_add(compressed_len)
285 .filter(|new| *new <= max_bytes)
286 },
287 ) {
288 self.spool_count.fetch_sub(1, AtomicOrdering::AcqRel);
289 return Err(TieredSinkError::SpoolFull(format!(
290 "max spool bytes {max_bytes} reached (current: {current_bytes}, \
291 new message: {compressed_len})"
292 )));
293 }
294 } else {
295 self.spool_bytes
296 .fetch_add(compressed_len, AtomicOrdering::AcqRel);
297 }
298
299 let mut sender = self.spool_sender.lock().await;
301 if let Err(e) = sender.send(compressed).await {
302 drop(sender);
303 self.spool_count.fetch_sub(1, AtomicOrdering::AcqRel);
304 self.spool_bytes
305 .fetch_sub(compressed_len, AtomicOrdering::AcqRel);
306 return Err(TieredSinkError::Spool(e.to_string()));
307 }
308 drop(sender);
309
310 #[cfg(feature = "metrics")]
311 {
312 ::metrics::gauge!("dfe_spool_messages")
313 .set(self.spool_count.load(AtomicOrdering::Relaxed) as f64);
314 ::metrics::gauge!("dfe_spool_bytes")
315 .set(self.spool_bytes.load(AtomicOrdering::Relaxed) as f64);
316 }
317
318 #[cfg(feature = "tracing")]
319 tracing::debug!(
320 spool_items = self.spool_count.load(AtomicOrdering::Relaxed),
321 spool_bytes = self.spool_bytes.load(AtomicOrdering::Relaxed),
322 "Message spooled to disk"
323 );
324
325 Ok(())
326 }
327
328 #[allow(clippy::cast_possible_truncation)]
330 pub async fn spool_len(&self) -> usize {
331 self.spool_count.load(AtomicOrdering::Relaxed) as usize
332 }
333
334 pub async fn spool_is_empty(&self) -> bool {
336 self.spool_count.load(AtomicOrdering::Relaxed) == 0
337 }
338
339 #[must_use]
341 pub fn spool_bytes(&self) -> u64 {
342 self.spool_bytes.load(AtomicOrdering::Relaxed)
343 }
344
345 #[must_use]
347 pub fn is_disk_available(&self) -> bool {
348 self.disk_available.load(AtomicOrdering::Relaxed)
349 }
350
351 pub async fn circuit_state(&self) -> CircuitState {
353 self.circuit.state().await
354 }
355
356 #[must_use]
358 pub fn hot_path_count(&self) -> u64 {
359 self.hot_path_count.load(AtomicOrdering::Relaxed)
360 }
361
362 #[must_use]
364 pub fn cold_path_count(&self) -> u64 {
365 self.cold_path_count.load(AtomicOrdering::Relaxed)
366 }
367
368 pub fn inner(&self) -> &S {
370 &self.sink
371 }
372
373 pub async fn reset_circuit(&self) {
375 self.circuit.reset().await;
376 }
377
378 pub async fn shutdown(mut self) {
380 self.shutdown.notify_one();
381 if let Some(handle) = self.drain_handle.take() {
382 let _ = handle.await;
383 }
384 }
385}
386
387fn spool_item_count_and_bytes(path: &std::path::Path) -> (u64, u64) {
394 if !path.is_dir() {
395 return (0, 0);
396 }
397
398 let recv_metadata_path = path.join("recv-metadata");
400 let (recv_segment, recv_position) = if recv_metadata_path.exists() {
401 std::fs::read(&recv_metadata_path)
402 .ok()
403 .and_then(|data| {
404 if data.len() >= 16 {
405 let segment = u64::from_be_bytes(data[0..8].try_into().ok()?);
406 let position = u64::from_be_bytes(data[8..16].try_into().ok()?);
407 Some((segment, position))
408 } else {
409 None
410 }
411 })
412 .unwrap_or((0, 0))
413 } else {
414 (0, 0)
415 };
416
417 let mut segments: Vec<u64> = Vec::new();
419 if let Ok(entries) = std::fs::read_dir(path) {
420 for entry in entries.flatten() {
421 let file_path = entry.path();
422 if file_path.extension().and_then(|e| e.to_str()) == Some("q")
423 && let Some(seg_num) = file_path
424 .file_stem()
425 .and_then(|s| s.to_str())
426 .and_then(|s| s.parse::<u64>().ok())
427 && seg_num >= recv_segment
428 {
429 segments.push(seg_num);
430 }
431 }
432 }
433 segments.sort_unstable();
434
435 let header_eof: [u8; 4] = [255, 255, 255, 255];
436 let mut count = 0u64;
437 let mut bytes = 0u64;
438
439 for &seg_num in &segments {
440 let seg_path = path.join(format!("{seg_num}.q"));
441 let Ok(file_data) = std::fs::read(&seg_path) else {
442 continue;
443 };
444
445 #[allow(clippy::cast_possible_truncation)]
446 let start = if seg_num == recv_segment {
447 recv_position as usize
448 } else {
449 0
450 };
451
452 let mut pos = start;
453 while pos + 4 <= file_data.len() {
454 let header_bytes: [u8; 4] = file_data[pos..pos + 4].try_into().unwrap_or([0; 4]);
455 if header_bytes == header_eof {
456 break;
457 }
458 let encoded = u32::from_be_bytes(header_bytes);
459 let payload_len = (encoded & 0x03_FF_FF_FF) as usize;
460 pos += 4 + payload_len;
461 if pos <= file_data.len() {
462 count += 1;
463 bytes += payload_len as u64;
464 }
465 }
466 }
467
468 (count, bytes)
469}
470
471#[allow(unsafe_code)]
479fn check_disk_space(path: &std::path::Path) -> Option<(u64, u64)> {
480 use std::ffi::CString;
481 let c_path = CString::new(path.to_string_lossy().as_bytes()).ok()?;
482
483 #[allow(unsafe_code)]
488 let stat = unsafe {
489 let mut stat: libc::statvfs = std::mem::zeroed();
490 let result = libc::statvfs(c_path.as_ptr(), &raw mut stat);
491 if result != 0 {
492 return None;
493 }
494 stat
495 };
496
497 #[cfg(target_os = "macos")]
501 {
502 let block_size: u64 = u64::from(stat.f_frsize);
503 let total: u64 = u64::from(stat.f_blocks) * block_size;
504 let available: u64 = u64::from(stat.f_bavail) * block_size;
505 Some((total, available))
506 }
507 #[cfg(not(target_os = "macos"))]
508 {
509 let block_size = stat.f_frsize;
510 let total = stat.f_blocks * block_size;
511 let available = stat.f_bavail * block_size;
512 Some((total, available))
513 }
514}
515
516async fn disk_capacity_poller(
518 spool_path: std::path::PathBuf,
519 disk_available: Arc<std::sync::atomic::AtomicBool>,
520 max_usage_percent: f64,
521 poll_interval: std::time::Duration,
522 shutdown: Arc<Notify>,
523) {
524 loop {
525 tokio::select! {
526 () = shutdown.notified() => {
527 #[cfg(feature = "tracing")]
528 tracing::debug!("Disk capacity poller shutting down");
529 return;
530 }
531 () = tokio::time::sleep(poll_interval) => {}
532 }
533
534 let disk_space = check_disk_space(&spool_path);
535
536 #[cfg(feature = "metrics")]
537 if let Some((total, avail)) = disk_space {
538 ::metrics::gauge!("dfe_spool_disk_available_bytes").set(avail as f64);
539 ::metrics::gauge!("dfe_spool_disk_total_bytes").set(total as f64);
540 }
541
542 let available = disk_space.is_none_or(|(total, avail)| {
543 if total == 0 {
544 return true;
545 }
546 let used_ratio = 1.0 - (avail as f64 / total as f64);
547 let ok = used_ratio < max_usage_percent;
548 #[cfg(feature = "tracing")]
549 if !ok {
550 tracing::warn!(
551 used_percent = format!("{:.1}%", used_ratio * 100.0),
552 threshold = format!("{:.1}%", max_usage_percent * 100.0),
553 "Disk usage exceeds threshold, pausing spool writes"
554 );
555 }
556 ok
557 });
558
559 disk_available.store(available, std::sync::atomic::Ordering::Relaxed);
560 }
561}
562
563impl<S: Sink> Drop for TieredSink<S> {
564 fn drop(&mut self) {
565 let pending = self.spool_count.load(AtomicOrdering::Relaxed);
571 if pending > 0 {
572 #[cfg(feature = "metrics")]
573 ::metrics::counter!("dfe_spool_dropped_without_shutdown_total").increment(1);
574 #[cfg(feature = "tracing")]
575 tracing::warn!(
576 pending,
577 "TieredSink dropped with spooled work pending -- call shutdown().await for durability"
578 );
579 }
580 self.shutdown.notify_one();
581 }
582}
583
584#[cfg(test)]
585mod tests {
586 use super::*;
587 use std::sync::atomic::AtomicBool;
588 use tempfile::tempdir;
589
590 #[derive(Debug)]
591 struct TestError(String);
592
593 impl std::fmt::Display for TestError {
594 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
595 write!(f, "{}", self.0)
596 }
597 }
598
599 impl std::error::Error for TestError {}
600
601 struct TestSink {
602 available: AtomicBool,
603 received: Mutex<Vec<Vec<u8>>>,
604 }
605
606 impl TestSink {
607 fn new() -> Self {
608 Self {
609 available: AtomicBool::new(true),
610 received: Mutex::new(Vec::new()),
611 }
612 }
613
614 fn set_available(&self, available: bool) {
615 self.available.store(available, AtomicOrdering::SeqCst);
616 }
617
618 async fn received_count(&self) -> usize {
619 self.received.lock().await.len()
620 }
621 }
622
623 impl Sink for TestSink {
624 type Error = TestError;
625
626 async fn try_send(&self, data: &[u8]) -> std::result::Result<(), SinkError<Self::Error>> {
627 if self.available.load(AtomicOrdering::SeqCst) {
628 self.received.lock().await.push(data.to_vec());
629 Ok(())
630 } else {
631 Err(SinkError::Unavailable)
632 }
633 }
634 }
635
636 #[tokio::test]
637 async fn test_hot_path_when_available() {
638 let dir = tempdir().unwrap();
639 let spool_path = dir.path().join("test-queue");
640
641 let sink = TestSink::new();
642 let config = TieredSinkConfig::new(&spool_path);
643
644 let tiered = TieredSink::new(sink, config).await.unwrap();
645
646 tiered.send(b"hello").await.unwrap();
647
648 assert_eq!(tiered.hot_path_count(), 1);
649 assert_eq!(tiered.cold_path_count(), 0);
650 assert!(tiered.spool_is_empty().await);
651 assert_eq!(tiered.inner().received_count().await, 1);
652
653 tiered.shutdown().await;
654 }
655
656 #[tokio::test]
657 async fn test_cold_path_when_unavailable() {
658 let dir = tempdir().unwrap();
659 let spool_path = dir.path().join("test-queue");
660
661 let sink = TestSink::new();
662 sink.set_available(false);
663
664 let mut config = TieredSinkConfig::new(&spool_path);
665 config.circuit_failure_threshold = 1; let tiered = TieredSink::new(sink, config).await.unwrap();
668
669 tiered.send(b"hello").await.unwrap();
671
672 assert_eq!(tiered.cold_path_count(), 1);
674 assert!(!tiered.spool_is_empty().await);
675 assert_eq!(tiered.inner().received_count().await, 0);
676
677 tiered.shutdown().await;
678 }
679
680 #[tokio::test]
681 async fn test_circuit_opens_after_failures() {
682 let dir = tempdir().unwrap();
683 let spool_path = dir.path().join("test-queue");
684
685 let sink = TestSink::new();
686 sink.set_available(false);
687
688 let mut config = TieredSinkConfig::new(&spool_path);
689 config.circuit_failure_threshold = 3;
690
691 let tiered = TieredSink::new(sink, config).await.unwrap();
692
693 tiered.send(b"1").await.unwrap();
695 tiered.send(b"2").await.unwrap();
696 assert_eq!(tiered.circuit_state().await, CircuitState::Closed);
697
698 tiered.send(b"3").await.unwrap();
700 assert_eq!(tiered.circuit_state().await, CircuitState::Open);
701
702 tiered.shutdown().await;
703 }
704
705 #[tokio::test]
706 async fn test_drain_recovers_messages() {
707 let dir = tempdir().unwrap();
708 let spool_path = dir.path().join("test-queue");
709
710 let sink = TestSink::new();
711 sink.set_available(false);
712
713 let mut config = TieredSinkConfig::new(&spool_path);
714 config.circuit_failure_threshold = 1;
715 config.circuit_reset_timeout_ms = 50;
716 config.drain_interval_ms = 10;
717
718 let tiered = TieredSink::new(sink, config).await.unwrap();
719
720 tiered.send(b"recover me").await.unwrap();
722 assert_eq!(tiered.spool_len().await, 1);
723
724 tiered.inner().set_available(true);
726 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
727
728 assert!(tiered.spool_is_empty().await);
730 assert_eq!(tiered.inner().received_count().await, 1);
731
732 tiered.shutdown().await;
733 }
734
735 #[tokio::test]
736 async fn test_strict_fifo_waits_for_drain() {
737 let dir = tempdir().unwrap();
738 let spool_path = dir.path().join("test-queue");
739
740 let sink = TestSink::new();
741 sink.set_available(false);
742
743 let mut config = TieredSinkConfig::new(&spool_path);
744 config.ordering = OrderingMode::StrictFifo;
745 config.circuit_failure_threshold = 1;
746
747 let tiered = TieredSink::new(sink, config).await.unwrap();
748
749 tiered.send(b"first message").await.unwrap();
751 assert_eq!(tiered.spool_len().await, 1);
752
753 tiered.inner().set_available(true);
755 tiered.reset_circuit().await;
756
757 tiered.send(b"new message").await.unwrap();
759
760 assert_eq!(tiered.spool_len().await, 2);
762 assert_eq!(tiered.cold_path_count(), 2);
763
764 tiered.shutdown().await;
765 }
766
767 #[tokio::test]
768 async fn test_max_spool_bytes_enforced() {
769 let dir = tempdir().unwrap();
770 let spool_path = dir.path().join("test-bytes-limit");
771
772 let sink = TestSink::new();
773 sink.set_available(false);
774
775 let mut config = TieredSinkConfig::new(&spool_path);
776 config.circuit_failure_threshold = 1;
777 config.max_spool_bytes = Some(50);
779
780 let tiered = TieredSink::new(sink, config).await.unwrap();
781
782 tiered.send(b"small").await.unwrap();
784 assert_eq!(tiered.cold_path_count(), 1);
785 assert!(tiered.spool_bytes() > 0);
786
787 let mut hit_limit = false;
789 for _ in 0..100 {
790 match tiered.send(b"more data here").await {
791 Ok(()) => {}
792 Err(TieredSinkError::SpoolFull(msg)) => {
793 assert!(msg.contains("max spool bytes"));
794 hit_limit = true;
795 break;
796 }
797 Err(e) => panic!("unexpected error: {e}"),
798 }
799 }
800 assert!(hit_limit, "should have hit spool byte limit");
801
802 tiered.shutdown().await;
803 }
804
805 #[tokio::test]
806 async fn test_spool_bytes_decremented_on_drain() {
807 let dir = tempdir().unwrap();
808 let spool_path = dir.path().join("test-bytes-drain");
809
810 let sink = TestSink::new();
811 sink.set_available(false);
812
813 let mut config = TieredSinkConfig::new(&spool_path);
814 config.circuit_failure_threshold = 1;
815 config.circuit_reset_timeout_ms = 50;
816 config.drain_interval_ms = 10;
817
818 let tiered = TieredSink::new(sink, config).await.unwrap();
819
820 tiered.send(b"drain me").await.unwrap();
822 tiered.send(b"drain me too").await.unwrap();
823 let bytes_after_spool = tiered.spool_bytes();
824 assert!(bytes_after_spool > 0);
825
826 tiered.inner().set_available(true);
828 tokio::time::sleep(std::time::Duration::from_millis(300)).await;
829
830 assert_eq!(tiered.spool_bytes(), 0);
832 assert!(tiered.spool_is_empty().await);
833
834 tiered.shutdown().await;
835 }
836
837 #[tokio::test]
838 async fn test_spool_count_initialised_from_existing_queue() {
839 let dir = tempdir().unwrap();
840 let spool_path = dir.path().join("test-init-count");
841
842 {
844 let sink = TestSink::new();
845 sink.set_available(false);
846
847 let mut config = TieredSinkConfig::new(&spool_path);
848 config.circuit_failure_threshold = 1;
849
850 let tiered = TieredSink::new(sink, config).await.unwrap();
851
852 tiered.send(b"message 1").await.unwrap();
853 tiered.send(b"message 2").await.unwrap();
854 tiered.send(b"message 3").await.unwrap();
855 assert_eq!(tiered.spool_len().await, 3);
856
857 tiered.shutdown().await;
858 }
859
860 {
862 let sink = TestSink::new();
863 let config = TieredSinkConfig::new(&spool_path);
864 let tiered = TieredSink::new(sink, config).await.unwrap();
865
866 assert_eq!(tiered.spool_len().await, 3);
867 assert!(tiered.spool_bytes() > 0);
868
869 tiered.shutdown().await;
870 }
871 }
872
873 #[tokio::test]
874 async fn test_disk_available_flag() {
875 let dir = tempdir().unwrap();
876 let spool_path = dir.path().join("test-disk-flag");
877
878 let sink = TestSink::new();
879 sink.set_available(false);
880
881 let mut config = TieredSinkConfig::new(&spool_path);
882 config.circuit_failure_threshold = 1;
883
884 let tiered = TieredSink::new(sink, config).await.unwrap();
885
886 assert!(tiered.is_disk_available());
888
889 tiered.disk_available.store(false, AtomicOrdering::Relaxed);
891
892 let result = tiered.send(b"should fail").await;
893 assert!(matches!(result, Err(TieredSinkError::DiskUnavailable)));
894
895 tiered.shutdown().await;
896 }
897
898 #[tokio::test]
901 async fn test_max_spool_items_not_overshooting_under_concurrency() {
902 let dir = tempdir().unwrap();
903 let spool_path = dir.path().join("test-toctou-items");
904
905 let sink = TestSink::new();
906 sink.set_available(false);
907
908 let mut config = TieredSinkConfig::new(&spool_path);
909 config.circuit_failure_threshold = 1;
910 config.max_spool_items = Some(10);
911
912 let tiered = Arc::new(TieredSink::new(sink, config).await.unwrap());
913
914 let mut joins = Vec::new();
916 for _ in 0..100 {
917 let t = Arc::clone(&tiered);
918 joins.push(tokio::spawn(async move { t.send(b"contention").await }));
919 }
920
921 let mut accepted = 0usize;
922 let mut rejected = 0usize;
923 for j in joins {
924 match j.await.unwrap() {
925 Ok(()) => accepted += 1,
926 Err(TieredSinkError::SpoolFull(_)) => rejected += 1,
927 Err(e) => panic!("unexpected error: {e}"),
928 }
929 }
930
931 assert_eq!(accepted, 10, "cap must not overshoot (got {accepted})");
932 assert_eq!(rejected, 90);
933 assert_eq!(tiered.spool_len().await, 10);
934
935 let tiered = Arc::try_unwrap(tiered)
936 .map_err(|_| "outstanding Arc refs")
937 .unwrap();
938 tiered.shutdown().await;
939 }
940}