1use crate::tiered_sink::{
12 CircuitBreaker, CircuitState, CompressionCodec, OrderingMode, Result, Sink, SinkError,
13 TieredSinkConfig, TieredSinkError, drainer,
14};
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
17use tokio::sync::{Mutex, Notify};
18use tokio::task::JoinHandle;
19use tokio::time::timeout;
20use yaque::{Receiver, Sender};
21
22pub struct TieredSink<S: Sink> {
28 sink: Arc<S>,
29 spool_sender: Arc<Mutex<Sender>>,
30 #[allow(dead_code)]
32 spool_receiver: Arc<Mutex<Receiver>>,
33 spool_count: Arc<AtomicU64>,
34 spool_bytes: Arc<AtomicU64>,
35 circuit: Arc<CircuitBreaker>,
36 codec: CompressionCodec,
37 config: TieredSinkConfig,
38 shutdown: Arc<Notify>,
39 drain_handle: Option<JoinHandle<()>>,
40 disk_available: Arc<std::sync::atomic::AtomicBool>,
41 #[allow(dead_code)]
42 disk_poller_handle: Option<JoinHandle<()>>,
43
44 fifo_gate: Option<Arc<Mutex<()>>>,
46
47 hot_path_count: AtomicU64,
49 cold_path_count: AtomicU64,
50}
51
52impl<S: Sink> TieredSink<S> {
53 pub async fn new(sink: S, config: TieredSinkConfig) -> Result<Self> {
59 let (sender, receiver) =
60 yaque::channel(&config.spool_path).map_err(|e| TieredSinkError::SpoolOpen {
61 path: config.spool_path.display().to_string(),
62 message: e.to_string(),
63 })?;
64
65 let sink = Arc::new(sink);
66 let spool_sender = Arc::new(Mutex::new(sender));
67 let spool_receiver = Arc::new(Mutex::new(receiver));
68
69 let spool_path_for_scan = config.spool_path.clone();
73 let (initial_count, initial_bytes) =
74 tokio::task::spawn_blocking(move || spool_item_count_and_bytes(&spool_path_for_scan))
75 .await
76 .unwrap_or((0, 0));
77 let spool_count = Arc::new(AtomicU64::new(initial_count));
78 let spool_bytes = Arc::new(AtomicU64::new(initial_bytes));
79
80 let circuit = Arc::new(CircuitBreaker::new(
81 config.circuit_failure_threshold,
82 config.circuit_reset_timeout(),
83 ));
84 let shutdown = Arc::new(Notify::new());
85 let codec = config.compression;
86 let disk_available = Arc::new(std::sync::atomic::AtomicBool::new(true));
87
88 let fifo_gate =
91 matches!(config.ordering, OrderingMode::StrictFifo).then(|| Arc::new(Mutex::new(())));
92
93 let disk_poller_handle = config.disk_aware.as_ref().map(|disk_cfg| {
95 let spool_path = config.spool_path.clone();
96 let disk_flag = Arc::clone(&disk_available);
97 let shutdown_clone = Arc::clone(&shutdown);
98 let poll_interval = std::time::Duration::from_secs(disk_cfg.poll_interval_secs);
99 let max_usage = disk_cfg.max_usage_percent;
100
101 tokio::spawn(disk_capacity_poller(
102 spool_path,
103 disk_flag,
104 max_usage,
105 poll_interval,
106 shutdown_clone,
107 ))
108 });
109
110 let drain_handle = tokio::spawn(drainer::drain_loop(
112 Arc::clone(&sink),
113 Arc::clone(&spool_receiver),
114 Arc::clone(&spool_count),
115 Arc::clone(&spool_bytes),
116 Arc::clone(&circuit),
117 codec,
118 config.drain_strategy,
119 config.drain_interval(),
120 Arc::clone(&shutdown),
121 fifo_gate.as_ref().map(Arc::clone),
122 ));
123
124 Ok(Self {
125 sink,
126 spool_sender,
127 spool_receiver,
128 spool_count,
129 spool_bytes,
130 circuit,
131 codec,
132 config,
133 shutdown,
134 drain_handle: Some(drain_handle),
135 disk_available,
136 disk_poller_handle,
137 fifo_gate,
138 hot_path_count: AtomicU64::new(0),
139 cold_path_count: AtomicU64::new(0),
140 })
141 }
142
143 pub async fn send(&self, data: &[u8]) -> Result<()> {
158 let _gate = match &self.fifo_gate {
161 Some(gate) => Some(gate.lock().await),
162 None => None,
163 };
164
165 let use_hot_path = self.should_use_hot_path().await;
166
167 if use_hot_path {
168 match self.try_hot_path(data).await {
169 Ok(()) => {
170 self.hot_path_count.fetch_add(1, AtomicOrdering::Relaxed);
171 #[cfg(feature = "metrics")]
172 ::metrics::counter!("dfe_spool_hot_path_total").increment(1);
173 return Ok(());
174 }
175 Err(TieredSinkError::Sink(_)) => {
176 return Err(TieredSinkError::Sink("fatal sink error".into()));
178 }
179 Err(_) => {
180 }
182 }
183 }
184
185 self.spool_message(data).await?;
187 self.cold_path_count.fetch_add(1, AtomicOrdering::Relaxed);
188 #[cfg(feature = "metrics")]
189 ::metrics::counter!("dfe_spool_cold_path_total").increment(1);
190 Ok(())
191 }
192
193 async fn should_use_hot_path(&self) -> bool {
195 let circuit_state = self.circuit.state().await;
196
197 #[cfg(feature = "metrics")]
198 ::metrics::gauge!("dfe_spool_circuit_state").set(match circuit_state {
199 CircuitState::Closed => 0.0,
200 CircuitState::HalfOpen => 1.0,
201 CircuitState::Open => 2.0,
202 });
203
204 match circuit_state {
205 CircuitState::Open => false,
206 CircuitState::Closed | CircuitState::HalfOpen => match self.config.ordering {
207 OrderingMode::Interleaved => true,
208 OrderingMode::StrictFifo => {
209 self.spool_count.load(AtomicOrdering::Relaxed) == 0
211 }
212 },
213 }
214 }
215
216 async fn try_hot_path(&self, data: &[u8]) -> Result<()> {
218 let send_timeout = self.config.send_timeout_duration();
219
220 match timeout(send_timeout, self.sink.try_send(data)).await {
221 Ok(Ok(())) => {
222 self.circuit.record_success().await;
223 Ok(())
224 }
225 Ok(Err(SinkError::Full)) => {
226 Err(TieredSinkError::Spool("sink full".into()))
228 }
229 Ok(Err(SinkError::Unavailable)) => {
230 self.circuit.record_failure().await;
231 #[cfg(feature = "metrics")]
232 ::metrics::counter!("dfe_spool_circuit_trips_total").increment(1);
233 Err(TieredSinkError::Spool("sink unavailable".into()))
234 }
235 Ok(Err(SinkError::Fatal(e))) => {
236 Err(TieredSinkError::Sink(e.to_string()))
238 }
239 Err(_timeout) => {
240 self.circuit.record_failure().await;
241 #[cfg(feature = "metrics")]
242 ::metrics::counter!("dfe_spool_circuit_trips_total").increment(1);
243 Err(TieredSinkError::Spool("send timeout".into()))
244 }
245 }
246 }
247
248 async fn spool_message(&self, data: &[u8]) -> Result<()> {
252 if !self.disk_available.load(AtomicOrdering::Relaxed) {
254 return Err(TieredSinkError::DiskUnavailable);
255 }
256
257 let compressed = self.codec.compress(data)?;
258 let compressed_len = compressed.len() as u64;
259
260 if let Some(max_items) = self.config.max_spool_items {
262 let max_items_u64 = max_items as u64;
263 self.spool_count
264 .fetch_update(AtomicOrdering::AcqRel, AtomicOrdering::Acquire, |cur| {
265 if cur < max_items_u64 {
266 Some(cur + 1)
267 } else {
268 None
269 }
270 })
271 .map_err(|_| {
272 TieredSinkError::SpoolFull(format!("max items {max_items} reached"))
273 })?;
274 } else {
275 self.spool_count.fetch_add(1, AtomicOrdering::AcqRel);
276 }
277
278 if let Some(max_bytes) = self.config.max_spool_bytes {
280 if let Err(current_bytes) = self.spool_bytes.fetch_update(
281 AtomicOrdering::AcqRel,
282 AtomicOrdering::Acquire,
283 |cur| {
284 cur.checked_add(compressed_len)
285 .filter(|new| *new <= max_bytes)
286 },
287 ) {
288 self.spool_count.fetch_sub(1, AtomicOrdering::AcqRel);
289 return Err(TieredSinkError::SpoolFull(format!(
290 "max spool bytes {max_bytes} reached (current: {current_bytes}, \
291 new message: {compressed_len})"
292 )));
293 }
294 } else {
295 self.spool_bytes
296 .fetch_add(compressed_len, AtomicOrdering::AcqRel);
297 }
298
299 let mut sender = self.spool_sender.lock().await;
301 if let Err(e) = sender.send(compressed).await {
302 drop(sender);
303 self.spool_count.fetch_sub(1, AtomicOrdering::AcqRel);
304 self.spool_bytes
305 .fetch_sub(compressed_len, AtomicOrdering::AcqRel);
306 return Err(TieredSinkError::Spool(e.to_string()));
307 }
308 drop(sender);
309
310 #[cfg(feature = "metrics")]
311 {
312 ::metrics::gauge!("dfe_spool_messages")
313 .set(self.spool_count.load(AtomicOrdering::Relaxed) as f64);
314 ::metrics::gauge!("dfe_spool_bytes")
315 .set(self.spool_bytes.load(AtomicOrdering::Relaxed) as f64);
316 }
317
318 #[cfg(feature = "tracing")]
319 tracing::debug!(
320 spool_items = self.spool_count.load(AtomicOrdering::Relaxed),
321 spool_bytes = self.spool_bytes.load(AtomicOrdering::Relaxed),
322 "Message spooled to disk"
323 );
324
325 Ok(())
326 }
327
328 #[allow(clippy::cast_possible_truncation)]
330 pub async fn spool_len(&self) -> usize {
331 self.spool_count.load(AtomicOrdering::Relaxed) as usize
332 }
333
334 pub async fn spool_is_empty(&self) -> bool {
336 self.spool_count.load(AtomicOrdering::Relaxed) == 0
337 }
338
339 #[must_use]
341 pub fn spool_bytes(&self) -> u64 {
342 self.spool_bytes.load(AtomicOrdering::Relaxed)
343 }
344
345 #[must_use]
347 pub fn is_disk_available(&self) -> bool {
348 self.disk_available.load(AtomicOrdering::Relaxed)
349 }
350
351 pub async fn circuit_state(&self) -> CircuitState {
353 self.circuit.state().await
354 }
355
356 #[must_use]
358 pub fn hot_path_count(&self) -> u64 {
359 self.hot_path_count.load(AtomicOrdering::Relaxed)
360 }
361
362 #[must_use]
364 pub fn cold_path_count(&self) -> u64 {
365 self.cold_path_count.load(AtomicOrdering::Relaxed)
366 }
367
368 pub fn inner(&self) -> &S {
370 &self.sink
371 }
372
373 pub async fn reset_circuit(&self) {
375 self.circuit.reset().await;
376 }
377
378 pub async fn shutdown(mut self) {
380 self.shutdown.notify_one();
381 if let Some(handle) = self.drain_handle.take() {
382 let _ = handle.await;
383 }
384 }
385}
386
387fn spool_item_count_and_bytes(path: &std::path::Path) -> (u64, u64) {
394 if !path.is_dir() {
395 return (0, 0);
396 }
397
398 let recv_metadata_path = path.join("recv-metadata");
400 let (recv_segment, recv_position) = if recv_metadata_path.exists() {
401 std::fs::read(&recv_metadata_path)
402 .ok()
403 .and_then(|data| {
404 if data.len() >= 16 {
405 let segment = u64::from_be_bytes(data[0..8].try_into().ok()?);
406 let position = u64::from_be_bytes(data[8..16].try_into().ok()?);
407 Some((segment, position))
408 } else {
409 None
410 }
411 })
412 .unwrap_or((0, 0))
413 } else {
414 (0, 0)
415 };
416
417 let mut segments: Vec<u64> = Vec::new();
419 if let Ok(entries) = std::fs::read_dir(path) {
420 for entry in entries.flatten() {
421 let file_path = entry.path();
422 if file_path.extension().and_then(|e| e.to_str()) == Some("q")
423 && let Some(seg_num) = file_path
424 .file_stem()
425 .and_then(|s| s.to_str())
426 .and_then(|s| s.parse::<u64>().ok())
427 && seg_num >= recv_segment
428 {
429 segments.push(seg_num);
430 }
431 }
432 }
433 segments.sort_unstable();
434
435 let header_eof: [u8; 4] = [255, 255, 255, 255];
436 let mut count = 0u64;
437 let mut bytes = 0u64;
438
439 for &seg_num in &segments {
440 let seg_path = path.join(format!("{seg_num}.q"));
441 let Ok(file_data) = std::fs::read(&seg_path) else {
442 continue;
443 };
444
445 #[allow(clippy::cast_possible_truncation)]
446 let start = if seg_num == recv_segment {
447 recv_position as usize
448 } else {
449 0
450 };
451
452 let mut pos = start;
453 while pos + 4 <= file_data.len() {
454 let header_bytes: [u8; 4] = file_data[pos..pos + 4].try_into().unwrap_or([0; 4]);
455 if header_bytes == header_eof {
456 break;
457 }
458 let encoded = u32::from_be_bytes(header_bytes);
459 let payload_len = (encoded & 0x03_FF_FF_FF) as usize;
460 pos += 4 + payload_len;
461 if pos <= file_data.len() {
462 count += 1;
463 bytes += payload_len as u64;
464 }
465 }
466 }
467
468 (count, bytes)
469}
470
471#[allow(unsafe_code)]
479fn check_disk_space(path: &std::path::Path) -> Option<(u64, u64)> {
480 use std::ffi::CString;
481 let c_path = CString::new(path.to_string_lossy().as_bytes()).ok()?;
482
483 #[allow(unsafe_code)]
488 let stat = unsafe {
489 let mut stat: libc::statvfs = std::mem::zeroed();
490 let result = libc::statvfs(c_path.as_ptr(), &raw mut stat);
491 if result != 0 {
492 return None;
493 }
494 stat
495 };
496
497 #[cfg(target_os = "macos")]
502 {
503 let block_size: u64 = stat.f_frsize;
504 let total: u64 = u64::from(stat.f_blocks) * block_size;
505 let available: u64 = u64::from(stat.f_bavail) * block_size;
506 Some((total, available))
507 }
508 #[cfg(not(target_os = "macos"))]
509 {
510 let block_size = stat.f_frsize;
511 let total = stat.f_blocks * block_size;
512 let available = stat.f_bavail * block_size;
513 Some((total, available))
514 }
515}
516
517async fn disk_capacity_poller(
519 spool_path: std::path::PathBuf,
520 disk_available: Arc<std::sync::atomic::AtomicBool>,
521 max_usage_percent: f64,
522 poll_interval: std::time::Duration,
523 shutdown: Arc<Notify>,
524) {
525 loop {
526 tokio::select! {
527 () = shutdown.notified() => {
528 #[cfg(feature = "tracing")]
529 tracing::debug!("Disk capacity poller shutting down");
530 return;
531 }
532 () = tokio::time::sleep(poll_interval) => {}
533 }
534
535 let disk_space = check_disk_space(&spool_path);
536
537 #[cfg(feature = "metrics")]
538 if let Some((total, avail)) = disk_space {
539 ::metrics::gauge!("dfe_spool_disk_available_bytes").set(avail as f64);
540 ::metrics::gauge!("dfe_spool_disk_total_bytes").set(total as f64);
541 }
542
543 let available = disk_space.is_none_or(|(total, avail)| {
544 if total == 0 {
545 return true;
546 }
547 let used_ratio = 1.0 - (avail as f64 / total as f64);
548 let ok = used_ratio < max_usage_percent;
549 #[cfg(feature = "tracing")]
550 if !ok {
551 tracing::warn!(
552 used_percent = format!("{:.1}%", used_ratio * 100.0),
553 threshold = format!("{:.1}%", max_usage_percent * 100.0),
554 "Disk usage exceeds threshold, pausing spool writes"
555 );
556 }
557 ok
558 });
559
560 disk_available.store(available, std::sync::atomic::Ordering::Relaxed);
561 }
562}
563
564impl<S: Sink> Drop for TieredSink<S> {
565 fn drop(&mut self) {
566 let pending = self.spool_count.load(AtomicOrdering::Relaxed);
572 if pending > 0 {
573 #[cfg(feature = "metrics")]
574 ::metrics::counter!("dfe_spool_dropped_without_shutdown_total").increment(1);
575 #[cfg(feature = "tracing")]
576 tracing::warn!(
577 pending,
578 "TieredSink dropped with spooled work pending -- call shutdown().await for durability"
579 );
580 }
581 self.shutdown.notify_one();
582 }
583}
584
585#[cfg(test)]
586mod tests {
587 use super::*;
588 use std::sync::atomic::AtomicBool;
589 use tempfile::tempdir;
590
591 #[derive(Debug)]
592 struct TestError(String);
593
594 impl std::fmt::Display for TestError {
595 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
596 write!(f, "{}", self.0)
597 }
598 }
599
600 impl std::error::Error for TestError {}
601
602 struct TestSink {
603 available: AtomicBool,
604 received: Mutex<Vec<Vec<u8>>>,
605 }
606
607 impl TestSink {
608 fn new() -> Self {
609 Self {
610 available: AtomicBool::new(true),
611 received: Mutex::new(Vec::new()),
612 }
613 }
614
615 fn set_available(&self, available: bool) {
616 self.available.store(available, AtomicOrdering::SeqCst);
617 }
618
619 async fn received_count(&self) -> usize {
620 self.received.lock().await.len()
621 }
622 }
623
624 impl Sink for TestSink {
625 type Error = TestError;
626
627 async fn try_send(&self, data: &[u8]) -> std::result::Result<(), SinkError<Self::Error>> {
628 if self.available.load(AtomicOrdering::SeqCst) {
629 self.received.lock().await.push(data.to_vec());
630 Ok(())
631 } else {
632 Err(SinkError::Unavailable)
633 }
634 }
635 }
636
637 #[tokio::test]
638 async fn test_hot_path_when_available() {
639 let dir = tempdir().unwrap();
640 let spool_path = dir.path().join("test-queue");
641
642 let sink = TestSink::new();
643 let config = TieredSinkConfig::new(&spool_path);
644
645 let tiered = TieredSink::new(sink, config).await.unwrap();
646
647 tiered.send(b"hello").await.unwrap();
648
649 assert_eq!(tiered.hot_path_count(), 1);
650 assert_eq!(tiered.cold_path_count(), 0);
651 assert!(tiered.spool_is_empty().await);
652 assert_eq!(tiered.inner().received_count().await, 1);
653
654 tiered.shutdown().await;
655 }
656
657 #[tokio::test]
658 async fn test_cold_path_when_unavailable() {
659 let dir = tempdir().unwrap();
660 let spool_path = dir.path().join("test-queue");
661
662 let sink = TestSink::new();
663 sink.set_available(false);
664
665 let mut config = TieredSinkConfig::new(&spool_path);
666 config.circuit_failure_threshold = 1; let tiered = TieredSink::new(sink, config).await.unwrap();
669
670 tiered.send(b"hello").await.unwrap();
672
673 assert_eq!(tiered.cold_path_count(), 1);
675 assert!(!tiered.spool_is_empty().await);
676 assert_eq!(tiered.inner().received_count().await, 0);
677
678 tiered.shutdown().await;
679 }
680
681 #[tokio::test]
682 async fn test_circuit_opens_after_failures() {
683 let dir = tempdir().unwrap();
684 let spool_path = dir.path().join("test-queue");
685
686 let sink = TestSink::new();
687 sink.set_available(false);
688
689 let mut config = TieredSinkConfig::new(&spool_path);
690 config.circuit_failure_threshold = 3;
691
692 let tiered = TieredSink::new(sink, config).await.unwrap();
693
694 tiered.send(b"1").await.unwrap();
696 tiered.send(b"2").await.unwrap();
697 assert_eq!(tiered.circuit_state().await, CircuitState::Closed);
698
699 tiered.send(b"3").await.unwrap();
701 assert_eq!(tiered.circuit_state().await, CircuitState::Open);
702
703 tiered.shutdown().await;
704 }
705
706 #[tokio::test]
707 async fn test_drain_recovers_messages() {
708 let dir = tempdir().unwrap();
709 let spool_path = dir.path().join("test-queue");
710
711 let sink = TestSink::new();
712 sink.set_available(false);
713
714 let mut config = TieredSinkConfig::new(&spool_path);
715 config.circuit_failure_threshold = 1;
716 config.circuit_reset_timeout_ms = 50;
717 config.drain_interval_ms = 10;
718
719 let tiered = TieredSink::new(sink, config).await.unwrap();
720
721 tiered.send(b"recover me").await.unwrap();
723 assert_eq!(tiered.spool_len().await, 1);
724
725 tiered.inner().set_available(true);
727 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
728
729 assert!(tiered.spool_is_empty().await);
731 assert_eq!(tiered.inner().received_count().await, 1);
732
733 tiered.shutdown().await;
734 }
735
736 #[tokio::test]
737 async fn test_strict_fifo_waits_for_drain() {
738 let dir = tempdir().unwrap();
739 let spool_path = dir.path().join("test-queue");
740
741 let sink = TestSink::new();
742 sink.set_available(false);
743
744 let mut config = TieredSinkConfig::new(&spool_path);
745 config.ordering = OrderingMode::StrictFifo;
746 config.circuit_failure_threshold = 1;
747
748 let tiered = TieredSink::new(sink, config).await.unwrap();
749
750 tiered.send(b"first message").await.unwrap();
752 assert_eq!(tiered.spool_len().await, 1);
753
754 tiered.inner().set_available(true);
756 tiered.reset_circuit().await;
757
758 tiered.send(b"new message").await.unwrap();
760
761 assert_eq!(tiered.spool_len().await, 2);
763 assert_eq!(tiered.cold_path_count(), 2);
764
765 tiered.shutdown().await;
766 }
767
768 #[tokio::test]
769 async fn test_max_spool_bytes_enforced() {
770 let dir = tempdir().unwrap();
771 let spool_path = dir.path().join("test-bytes-limit");
772
773 let sink = TestSink::new();
774 sink.set_available(false);
775
776 let mut config = TieredSinkConfig::new(&spool_path);
777 config.circuit_failure_threshold = 1;
778 config.max_spool_bytes = Some(50);
780
781 let tiered = TieredSink::new(sink, config).await.unwrap();
782
783 tiered.send(b"small").await.unwrap();
785 assert_eq!(tiered.cold_path_count(), 1);
786 assert!(tiered.spool_bytes() > 0);
787
788 let mut hit_limit = false;
790 for _ in 0..100 {
791 match tiered.send(b"more data here").await {
792 Ok(()) => {}
793 Err(TieredSinkError::SpoolFull(msg)) => {
794 assert!(msg.contains("max spool bytes"));
795 hit_limit = true;
796 break;
797 }
798 Err(e) => panic!("unexpected error: {e}"),
799 }
800 }
801 assert!(hit_limit, "should have hit spool byte limit");
802
803 tiered.shutdown().await;
804 }
805
806 #[tokio::test]
807 async fn test_spool_bytes_decremented_on_drain() {
808 let dir = tempdir().unwrap();
809 let spool_path = dir.path().join("test-bytes-drain");
810
811 let sink = TestSink::new();
812 sink.set_available(false);
813
814 let mut config = TieredSinkConfig::new(&spool_path);
815 config.circuit_failure_threshold = 1;
816 config.circuit_reset_timeout_ms = 50;
817 config.drain_interval_ms = 10;
818
819 let tiered = TieredSink::new(sink, config).await.unwrap();
820
821 tiered.send(b"drain me").await.unwrap();
823 tiered.send(b"drain me too").await.unwrap();
824 let bytes_after_spool = tiered.spool_bytes();
825 assert!(bytes_after_spool > 0);
826
827 tiered.inner().set_available(true);
829 tokio::time::sleep(std::time::Duration::from_millis(300)).await;
830
831 assert_eq!(tiered.spool_bytes(), 0);
833 assert!(tiered.spool_is_empty().await);
834
835 tiered.shutdown().await;
836 }
837
838 #[tokio::test]
839 async fn test_spool_count_initialised_from_existing_queue() {
840 let dir = tempdir().unwrap();
841 let spool_path = dir.path().join("test-init-count");
842
843 {
845 let sink = TestSink::new();
846 sink.set_available(false);
847
848 let mut config = TieredSinkConfig::new(&spool_path);
849 config.circuit_failure_threshold = 1;
850
851 let tiered = TieredSink::new(sink, config).await.unwrap();
852
853 tiered.send(b"message 1").await.unwrap();
854 tiered.send(b"message 2").await.unwrap();
855 tiered.send(b"message 3").await.unwrap();
856 assert_eq!(tiered.spool_len().await, 3);
857
858 tiered.shutdown().await;
859 }
860
861 {
863 let sink = TestSink::new();
864 let config = TieredSinkConfig::new(&spool_path);
865 let tiered = TieredSink::new(sink, config).await.unwrap();
866
867 assert_eq!(tiered.spool_len().await, 3);
868 assert!(tiered.spool_bytes() > 0);
869
870 tiered.shutdown().await;
871 }
872 }
873
874 #[tokio::test]
875 async fn test_disk_available_flag() {
876 let dir = tempdir().unwrap();
877 let spool_path = dir.path().join("test-disk-flag");
878
879 let sink = TestSink::new();
880 sink.set_available(false);
881
882 let mut config = TieredSinkConfig::new(&spool_path);
883 config.circuit_failure_threshold = 1;
884
885 let tiered = TieredSink::new(sink, config).await.unwrap();
886
887 assert!(tiered.is_disk_available());
889
890 tiered.disk_available.store(false, AtomicOrdering::Relaxed);
892
893 let result = tiered.send(b"should fail").await;
894 assert!(matches!(result, Err(TieredSinkError::DiskUnavailable)));
895
896 tiered.shutdown().await;
897 }
898
899 #[tokio::test]
902 async fn test_max_spool_items_not_overshooting_under_concurrency() {
903 let dir = tempdir().unwrap();
904 let spool_path = dir.path().join("test-toctou-items");
905
906 let sink = TestSink::new();
907 sink.set_available(false);
908
909 let mut config = TieredSinkConfig::new(&spool_path);
910 config.circuit_failure_threshold = 1;
911 config.max_spool_items = Some(10);
912
913 let tiered = Arc::new(TieredSink::new(sink, config).await.unwrap());
914
915 let mut joins = Vec::new();
917 for _ in 0..100 {
918 let t = Arc::clone(&tiered);
919 joins.push(tokio::spawn(async move { t.send(b"contention").await }));
920 }
921
922 let mut accepted = 0usize;
923 let mut rejected = 0usize;
924 for j in joins {
925 match j.await.unwrap() {
926 Ok(()) => accepted += 1,
927 Err(TieredSinkError::SpoolFull(_)) => rejected += 1,
928 Err(e) => panic!("unexpected error: {e}"),
929 }
930 }
931
932 assert_eq!(accepted, 10, "cap must not overshoot (got {accepted})");
933 assert_eq!(rejected, 90);
934 assert_eq!(tiered.spool_len().await, 10);
935
936 let tiered = Arc::try_unwrap(tiered)
937 .map_err(|_| "outstanding Arc refs")
938 .unwrap();
939 tiered.shutdown().await;
940 }
941}