1use crate::tiered_sink::{
12 CircuitBreaker, CircuitState, CompressionCodec, OrderingMode, Result, Sink, SinkError,
13 TieredSinkConfig, TieredSinkError, drainer,
14};
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
17use tokio::sync::{Mutex, Notify};
18use tokio::task::JoinHandle;
19use tokio::time::timeout;
20use yaque::{Receiver, Sender};
21
22pub struct TieredSink<S: Sink> {
28 sink: Arc<S>,
29 spool_sender: Arc<Mutex<Sender>>,
30 #[allow(dead_code)]
32 spool_receiver: Arc<Mutex<Receiver>>,
33 spool_count: Arc<AtomicU64>,
34 spool_bytes: Arc<AtomicU64>,
35 circuit: Arc<CircuitBreaker>,
36 codec: CompressionCodec,
37 config: TieredSinkConfig,
38 shutdown: Arc<Notify>,
39 drain_handle: Option<JoinHandle<()>>,
40 disk_available: Arc<std::sync::atomic::AtomicBool>,
41 #[allow(dead_code)]
42 disk_poller_handle: Option<JoinHandle<()>>,
43
44 fifo_gate: Option<Arc<Mutex<()>>>,
46
47 hot_path_count: AtomicU64,
49 cold_path_count: AtomicU64,
50}
51
52impl<S: Sink> TieredSink<S> {
53 pub async fn new(sink: S, config: TieredSinkConfig) -> Result<Self> {
59 let (sender, receiver) =
60 yaque::channel(&config.spool_path).map_err(|e| TieredSinkError::SpoolOpen {
61 path: config.spool_path.display().to_string(),
62 message: e.to_string(),
63 })?;
64
65 let sink = Arc::new(sink);
66 let spool_sender = Arc::new(Mutex::new(sender));
67 let spool_receiver = Arc::new(Mutex::new(receiver));
68
69 let spool_path_for_scan = config.spool_path.clone();
73 let (initial_count, initial_bytes) =
74 tokio::task::spawn_blocking(move || spool_item_count_and_bytes(&spool_path_for_scan))
75 .await
76 .unwrap_or((0, 0));
77 let spool_count = Arc::new(AtomicU64::new(initial_count));
78 let spool_bytes = Arc::new(AtomicU64::new(initial_bytes));
79
80 let circuit = Arc::new(CircuitBreaker::new(
81 config.circuit_failure_threshold,
82 config.circuit_reset_timeout(),
83 ));
84 let shutdown = Arc::new(Notify::new());
85 let codec = config.compression;
86 let disk_available = Arc::new(std::sync::atomic::AtomicBool::new(true));
87
88 let fifo_gate =
91 matches!(config.ordering, OrderingMode::StrictFifo).then(|| Arc::new(Mutex::new(())));
92
93 let disk_poller_handle = config.disk_aware.as_ref().map(|disk_cfg| {
95 let spool_path = config.spool_path.clone();
96 let disk_flag = Arc::clone(&disk_available);
97 let shutdown_clone = Arc::clone(&shutdown);
98 let poll_interval = std::time::Duration::from_secs(disk_cfg.poll_interval_secs);
99 let max_usage = disk_cfg.max_usage_percent;
100
101 tokio::spawn(disk_capacity_poller(
102 spool_path,
103 disk_flag,
104 max_usage,
105 poll_interval,
106 shutdown_clone,
107 ))
108 });
109
110 let drain_handle = tokio::spawn(drainer::drain_loop(
112 Arc::clone(&sink),
113 Arc::clone(&spool_receiver),
114 Arc::clone(&spool_count),
115 Arc::clone(&spool_bytes),
116 Arc::clone(&circuit),
117 codec,
118 config.drain_strategy,
119 config.drain_interval(),
120 Arc::clone(&shutdown),
121 fifo_gate.as_ref().map(Arc::clone),
122 ));
123
124 Ok(Self {
125 sink,
126 spool_sender,
127 spool_receiver,
128 spool_count,
129 spool_bytes,
130 circuit,
131 codec,
132 config,
133 shutdown,
134 drain_handle: Some(drain_handle),
135 disk_available,
136 disk_poller_handle,
137 fifo_gate,
138 hot_path_count: AtomicU64::new(0),
139 cold_path_count: AtomicU64::new(0),
140 })
141 }
142
143 pub async fn send(&self, data: &[u8]) -> Result<()> {
158 let _gate = match &self.fifo_gate {
161 Some(gate) => Some(gate.lock().await),
162 None => None,
163 };
164
165 let use_hot_path = self.should_use_hot_path().await;
166
167 if use_hot_path {
168 match self.try_hot_path(data).await {
169 Ok(()) => {
170 self.hot_path_count.fetch_add(1, AtomicOrdering::Relaxed);
171 #[cfg(feature = "metrics")]
172 ::metrics::counter!("dfe_spool_hot_path_total").increment(1);
173 return Ok(());
174 }
175 Err(TieredSinkError::Sink(_)) => {
176 return Err(TieredSinkError::Sink("fatal sink error".into()));
178 }
179 Err(_) => {
180 }
182 }
183 }
184
185 self.spool_message(data).await?;
187 self.cold_path_count.fetch_add(1, AtomicOrdering::Relaxed);
188 #[cfg(feature = "metrics")]
189 ::metrics::counter!("dfe_spool_cold_path_total").increment(1);
190 Ok(())
191 }
192
193 async fn should_use_hot_path(&self) -> bool {
195 let circuit_state = self.circuit.state().await;
196
197 #[cfg(feature = "metrics")]
198 ::metrics::gauge!("dfe_spool_circuit_state").set(match circuit_state {
199 CircuitState::Closed => 0.0,
200 CircuitState::HalfOpen => 1.0,
201 CircuitState::Open => 2.0,
202 });
203
204 match circuit_state {
205 CircuitState::Open => false,
206 CircuitState::Closed | CircuitState::HalfOpen => match self.config.ordering {
207 OrderingMode::Interleaved => true,
208 OrderingMode::StrictFifo => {
209 self.spool_count.load(AtomicOrdering::Relaxed) == 0
211 }
212 },
213 }
214 }
215
216 async fn try_hot_path(&self, data: &[u8]) -> Result<()> {
218 let send_timeout = self.config.send_timeout_duration();
219
220 match timeout(send_timeout, self.sink.try_send(data)).await {
221 Ok(Ok(())) => {
222 self.circuit.record_success().await;
223 Ok(())
224 }
225 Ok(Err(SinkError::Full)) => {
226 Err(TieredSinkError::Spool("sink full".into()))
228 }
229 Ok(Err(SinkError::Unavailable)) => {
230 self.circuit.record_failure().await;
231 #[cfg(feature = "metrics")]
232 ::metrics::counter!("dfe_spool_circuit_trips_total").increment(1);
233 Err(TieredSinkError::Spool("sink unavailable".into()))
234 }
235 Ok(Err(SinkError::Fatal(e))) => {
236 Err(TieredSinkError::Sink(e.to_string()))
238 }
239 Err(_timeout) => {
240 self.circuit.record_failure().await;
241 #[cfg(feature = "metrics")]
242 ::metrics::counter!("dfe_spool_circuit_trips_total").increment(1);
243 Err(TieredSinkError::Spool("send timeout".into()))
244 }
245 }
246 }
247
248 async fn spool_message(&self, data: &[u8]) -> Result<()> {
252 if !self.disk_available.load(AtomicOrdering::Relaxed) {
254 return Err(TieredSinkError::DiskUnavailable);
255 }
256
257 let compressed = self.codec.compress(data)?;
258 let compressed_len = compressed.len() as u64;
259
260 if let Some(max_items) = self.config.max_spool_items {
262 let max_items_u64 = max_items as u64;
263 self.spool_count
264 .fetch_update(AtomicOrdering::AcqRel, AtomicOrdering::Acquire, |cur| {
265 if cur < max_items_u64 {
266 Some(cur + 1)
267 } else {
268 None
269 }
270 })
271 .map_err(|_| {
272 TieredSinkError::SpoolFull(format!("max items {max_items} reached"))
273 })?;
274 } else {
275 self.spool_count.fetch_add(1, AtomicOrdering::AcqRel);
276 }
277
278 if let Some(max_bytes) = self.config.max_spool_bytes {
280 if let Err(current_bytes) = self.spool_bytes.fetch_update(
281 AtomicOrdering::AcqRel,
282 AtomicOrdering::Acquire,
283 |cur| {
284 cur.checked_add(compressed_len)
285 .filter(|new| *new <= max_bytes)
286 },
287 ) {
288 self.spool_count.fetch_sub(1, AtomicOrdering::AcqRel);
289 return Err(TieredSinkError::SpoolFull(format!(
290 "max spool bytes {max_bytes} reached (current: {current_bytes}, \
291 new message: {compressed_len})"
292 )));
293 }
294 } else {
295 self.spool_bytes
296 .fetch_add(compressed_len, AtomicOrdering::AcqRel);
297 }
298
299 let mut sender = self.spool_sender.lock().await;
301 if let Err(e) = sender.send(compressed).await {
302 drop(sender);
303 self.spool_count.fetch_sub(1, AtomicOrdering::AcqRel);
304 self.spool_bytes
305 .fetch_sub(compressed_len, AtomicOrdering::AcqRel);
306 return Err(TieredSinkError::Spool(e.to_string()));
307 }
308 drop(sender);
309
310 #[cfg(feature = "metrics")]
311 {
312 ::metrics::gauge!("dfe_spool_messages")
313 .set(self.spool_count.load(AtomicOrdering::Relaxed) as f64);
314 ::metrics::gauge!("dfe_spool_bytes")
315 .set(self.spool_bytes.load(AtomicOrdering::Relaxed) as f64);
316 ::metrics::counter!("dfe_spool_enqueue_total").increment(1);
318 }
319
320 #[cfg(feature = "tracing")]
321 tracing::debug!(
322 spool_items = self.spool_count.load(AtomicOrdering::Relaxed),
323 spool_bytes = self.spool_bytes.load(AtomicOrdering::Relaxed),
324 "Message spooled to disk"
325 );
326
327 Ok(())
328 }
329
330 #[allow(clippy::cast_possible_truncation)]
332 pub async fn spool_len(&self) -> usize {
333 self.spool_count.load(AtomicOrdering::Relaxed) as usize
334 }
335
336 pub async fn spool_is_empty(&self) -> bool {
338 self.spool_count.load(AtomicOrdering::Relaxed) == 0
339 }
340
341 #[must_use]
343 pub fn spool_bytes(&self) -> u64 {
344 self.spool_bytes.load(AtomicOrdering::Relaxed)
345 }
346
347 #[must_use]
349 pub fn is_disk_available(&self) -> bool {
350 self.disk_available.load(AtomicOrdering::Relaxed)
351 }
352
353 pub async fn circuit_state(&self) -> CircuitState {
355 self.circuit.state().await
356 }
357
358 #[must_use]
360 pub fn hot_path_count(&self) -> u64 {
361 self.hot_path_count.load(AtomicOrdering::Relaxed)
362 }
363
364 #[must_use]
366 pub fn cold_path_count(&self) -> u64 {
367 self.cold_path_count.load(AtomicOrdering::Relaxed)
368 }
369
370 pub fn inner(&self) -> &S {
372 &self.sink
373 }
374
375 pub async fn reset_circuit(&self) {
377 self.circuit.reset().await;
378 }
379
380 pub async fn shutdown(mut self) {
382 self.shutdown.notify_one();
383 if let Some(handle) = self.drain_handle.take() {
384 let _ = handle.await;
385 }
386 }
387}
388
389fn spool_item_count_and_bytes(path: &std::path::Path) -> (u64, u64) {
396 if !path.is_dir() {
397 return (0, 0);
398 }
399
400 let recv_metadata_path = path.join("recv-metadata");
402 let (recv_segment, recv_position) = if recv_metadata_path.exists() {
403 std::fs::read(&recv_metadata_path)
404 .ok()
405 .and_then(|data| {
406 if data.len() >= 16 {
407 let segment = u64::from_be_bytes(data[0..8].try_into().ok()?);
408 let position = u64::from_be_bytes(data[8..16].try_into().ok()?);
409 Some((segment, position))
410 } else {
411 None
412 }
413 })
414 .unwrap_or((0, 0))
415 } else {
416 (0, 0)
417 };
418
419 let mut segments: Vec<u64> = Vec::new();
421 if let Ok(entries) = std::fs::read_dir(path) {
422 for entry in entries.flatten() {
423 let file_path = entry.path();
424 if file_path.extension().and_then(|e| e.to_str()) == Some("q")
425 && let Some(seg_num) = file_path
426 .file_stem()
427 .and_then(|s| s.to_str())
428 .and_then(|s| s.parse::<u64>().ok())
429 && seg_num >= recv_segment
430 {
431 segments.push(seg_num);
432 }
433 }
434 }
435 segments.sort_unstable();
436
437 let header_eof: [u8; 4] = [255, 255, 255, 255];
438 let mut count = 0u64;
439 let mut bytes = 0u64;
440
441 for &seg_num in &segments {
442 let seg_path = path.join(format!("{seg_num}.q"));
443 let Ok(file_data) = std::fs::read(&seg_path) else {
444 continue;
445 };
446
447 #[allow(clippy::cast_possible_truncation)]
448 let start = if seg_num == recv_segment {
449 recv_position as usize
450 } else {
451 0
452 };
453
454 let mut pos = start;
455 while pos + 4 <= file_data.len() {
456 let header_bytes: [u8; 4] = file_data[pos..pos + 4].try_into().unwrap_or([0; 4]);
457 if header_bytes == header_eof {
458 break;
459 }
460 let encoded = u32::from_be_bytes(header_bytes);
461 let payload_len = (encoded & 0x03_FF_FF_FF) as usize;
462 pos += 4 + payload_len;
463 if pos <= file_data.len() {
464 count += 1;
465 bytes += payload_len as u64;
466 }
467 }
468 }
469
470 (count, bytes)
471}
472
473#[allow(unsafe_code)]
481fn check_disk_space(path: &std::path::Path) -> Option<(u64, u64)> {
482 use std::ffi::CString;
483 let c_path = CString::new(path.to_string_lossy().as_bytes()).ok()?;
484
485 #[allow(unsafe_code)]
490 let stat = unsafe {
491 let mut stat: libc::statvfs = std::mem::zeroed();
492 let result = libc::statvfs(c_path.as_ptr(), &raw mut stat);
493 if result != 0 {
494 return None;
495 }
496 stat
497 };
498
499 #[cfg(target_os = "macos")]
504 {
505 let block_size: u64 = stat.f_frsize;
506 let total: u64 = u64::from(stat.f_blocks) * block_size;
507 let available: u64 = u64::from(stat.f_bavail) * block_size;
508 Some((total, available))
509 }
510 #[cfg(not(target_os = "macos"))]
511 {
512 let block_size = stat.f_frsize;
513 let total = stat.f_blocks * block_size;
514 let available = stat.f_bavail * block_size;
515 Some((total, available))
516 }
517}
518
519async fn disk_capacity_poller(
521 spool_path: std::path::PathBuf,
522 disk_available: Arc<std::sync::atomic::AtomicBool>,
523 max_usage_percent: f64,
524 poll_interval: std::time::Duration,
525 shutdown: Arc<Notify>,
526) {
527 loop {
528 tokio::select! {
529 () = shutdown.notified() => {
530 #[cfg(feature = "tracing")]
531 tracing::debug!("Disk capacity poller shutting down");
532 return;
533 }
534 () = tokio::time::sleep(poll_interval) => {}
535 }
536
537 let disk_space = check_disk_space(&spool_path);
538
539 #[cfg(feature = "metrics")]
540 if let Some((total, avail)) = disk_space {
541 ::metrics::gauge!("dfe_spool_disk_available_bytes").set(avail as f64);
542 ::metrics::gauge!("dfe_spool_disk_total_bytes").set(total as f64);
543 if total > 0 {
546 ::metrics::gauge!("dfe_spool_disk_usage_ratio")
547 .set(1.0 - (avail as f64 / total as f64));
548 }
549 }
550
551 let available = disk_space.is_none_or(|(total, avail)| {
552 if total == 0 {
553 return true;
554 }
555 let used_ratio = 1.0 - (avail as f64 / total as f64);
556 let ok = used_ratio < max_usage_percent;
557 #[cfg(feature = "tracing")]
558 if !ok {
559 tracing::warn!(
560 used_percent = format!("{:.1}%", used_ratio * 100.0),
561 threshold = format!("{:.1}%", max_usage_percent * 100.0),
562 "Disk usage exceeds threshold, pausing spool writes"
563 );
564 }
565 ok
566 });
567
568 disk_available.store(available, std::sync::atomic::Ordering::Relaxed);
569 }
570}
571
572impl<S: Sink> Drop for TieredSink<S> {
573 fn drop(&mut self) {
574 let pending = self.spool_count.load(AtomicOrdering::Relaxed);
580 if pending > 0 {
581 #[cfg(feature = "metrics")]
582 ::metrics::counter!("dfe_spool_dropped_without_shutdown_total").increment(1);
583 #[cfg(feature = "tracing")]
584 tracing::warn!(
585 pending,
586 "TieredSink dropped with spooled work pending -- call shutdown().await for durability"
587 );
588 }
589 self.shutdown.notify_one();
590 }
591}
592
593#[cfg(test)]
594mod tests {
595 use super::*;
596 use std::sync::atomic::AtomicBool;
597 use tempfile::tempdir;
598
599 #[derive(Debug)]
600 struct TestError(String);
601
602 impl std::fmt::Display for TestError {
603 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
604 write!(f, "{}", self.0)
605 }
606 }
607
608 impl std::error::Error for TestError {}
609
610 struct TestSink {
611 available: AtomicBool,
612 received: Mutex<Vec<Vec<u8>>>,
613 }
614
615 impl TestSink {
616 fn new() -> Self {
617 Self {
618 available: AtomicBool::new(true),
619 received: Mutex::new(Vec::new()),
620 }
621 }
622
623 fn set_available(&self, available: bool) {
624 self.available.store(available, AtomicOrdering::SeqCst);
625 }
626
627 async fn received_count(&self) -> usize {
628 self.received.lock().await.len()
629 }
630 }
631
632 impl Sink for TestSink {
633 type Error = TestError;
634
635 async fn try_send(&self, data: &[u8]) -> std::result::Result<(), SinkError<Self::Error>> {
636 if self.available.load(AtomicOrdering::SeqCst) {
637 self.received.lock().await.push(data.to_vec());
638 Ok(())
639 } else {
640 Err(SinkError::Unavailable)
641 }
642 }
643 }
644
645 #[tokio::test]
646 async fn test_hot_path_when_available() {
647 let dir = tempdir().unwrap();
648 let spool_path = dir.path().join("test-queue");
649
650 let sink = TestSink::new();
651 let config = TieredSinkConfig::new(&spool_path);
652
653 let tiered = TieredSink::new(sink, config).await.unwrap();
654
655 tiered.send(b"hello").await.unwrap();
656
657 assert_eq!(tiered.hot_path_count(), 1);
658 assert_eq!(tiered.cold_path_count(), 0);
659 assert!(tiered.spool_is_empty().await);
660 assert_eq!(tiered.inner().received_count().await, 1);
661
662 tiered.shutdown().await;
663 }
664
665 #[tokio::test]
666 async fn test_cold_path_when_unavailable() {
667 let dir = tempdir().unwrap();
668 let spool_path = dir.path().join("test-queue");
669
670 let sink = TestSink::new();
671 sink.set_available(false);
672
673 let mut config = TieredSinkConfig::new(&spool_path);
674 config.circuit_failure_threshold = 1; let tiered = TieredSink::new(sink, config).await.unwrap();
677
678 tiered.send(b"hello").await.unwrap();
680
681 assert_eq!(tiered.cold_path_count(), 1);
683 assert!(!tiered.spool_is_empty().await);
684 assert_eq!(tiered.inner().received_count().await, 0);
685
686 tiered.shutdown().await;
687 }
688
689 #[tokio::test]
690 async fn test_circuit_opens_after_failures() {
691 let dir = tempdir().unwrap();
692 let spool_path = dir.path().join("test-queue");
693
694 let sink = TestSink::new();
695 sink.set_available(false);
696
697 let mut config = TieredSinkConfig::new(&spool_path);
698 config.circuit_failure_threshold = 3;
699
700 let tiered = TieredSink::new(sink, config).await.unwrap();
701
702 tiered.send(b"1").await.unwrap();
704 tiered.send(b"2").await.unwrap();
705 assert_eq!(tiered.circuit_state().await, CircuitState::Closed);
706
707 tiered.send(b"3").await.unwrap();
709 assert_eq!(tiered.circuit_state().await, CircuitState::Open);
710
711 tiered.shutdown().await;
712 }
713
714 #[tokio::test]
715 async fn test_drain_recovers_messages() {
716 let dir = tempdir().unwrap();
717 let spool_path = dir.path().join("test-queue");
718
719 let sink = TestSink::new();
720 sink.set_available(false);
721
722 let mut config = TieredSinkConfig::new(&spool_path);
723 config.circuit_failure_threshold = 1;
724 config.circuit_reset_timeout_ms = 50;
725 config.drain_interval_ms = 10;
726
727 let tiered = TieredSink::new(sink, config).await.unwrap();
728
729 tiered.send(b"recover me").await.unwrap();
731 assert_eq!(tiered.spool_len().await, 1);
732
733 tiered.inner().set_available(true);
735 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
736
737 assert!(tiered.spool_is_empty().await);
739 assert_eq!(tiered.inner().received_count().await, 1);
740
741 tiered.shutdown().await;
742 }
743
744 #[tokio::test]
745 async fn test_strict_fifo_waits_for_drain() {
746 let dir = tempdir().unwrap();
747 let spool_path = dir.path().join("test-queue");
748
749 let sink = TestSink::new();
750 sink.set_available(false);
751
752 let mut config = TieredSinkConfig::new(&spool_path);
753 config.ordering = OrderingMode::StrictFifo;
754 config.circuit_failure_threshold = 1;
755
756 let tiered = TieredSink::new(sink, config).await.unwrap();
757
758 tiered.send(b"first message").await.unwrap();
760 assert_eq!(tiered.spool_len().await, 1);
761
762 tiered.inner().set_available(true);
764 tiered.reset_circuit().await;
765
766 tiered.send(b"new message").await.unwrap();
768
769 assert_eq!(tiered.spool_len().await, 2);
771 assert_eq!(tiered.cold_path_count(), 2);
772
773 tiered.shutdown().await;
774 }
775
776 #[tokio::test]
777 async fn test_max_spool_bytes_enforced() {
778 let dir = tempdir().unwrap();
779 let spool_path = dir.path().join("test-bytes-limit");
780
781 let sink = TestSink::new();
782 sink.set_available(false);
783
784 let mut config = TieredSinkConfig::new(&spool_path);
785 config.circuit_failure_threshold = 1;
786 config.max_spool_bytes = Some(50);
788
789 let tiered = TieredSink::new(sink, config).await.unwrap();
790
791 tiered.send(b"small").await.unwrap();
793 assert_eq!(tiered.cold_path_count(), 1);
794 assert!(tiered.spool_bytes() > 0);
795
796 let mut hit_limit = false;
798 for _ in 0..100 {
799 match tiered.send(b"more data here").await {
800 Ok(()) => {}
801 Err(TieredSinkError::SpoolFull(msg)) => {
802 assert!(msg.contains("max spool bytes"));
803 hit_limit = true;
804 break;
805 }
806 Err(e) => panic!("unexpected error: {e}"),
807 }
808 }
809 assert!(hit_limit, "should have hit spool byte limit");
810
811 tiered.shutdown().await;
812 }
813
814 #[tokio::test]
815 async fn test_spool_bytes_decremented_on_drain() {
816 let dir = tempdir().unwrap();
817 let spool_path = dir.path().join("test-bytes-drain");
818
819 let sink = TestSink::new();
820 sink.set_available(false);
821
822 let mut config = TieredSinkConfig::new(&spool_path);
823 config.circuit_failure_threshold = 1;
824 config.circuit_reset_timeout_ms = 50;
825 config.drain_interval_ms = 10;
826
827 let tiered = TieredSink::new(sink, config).await.unwrap();
828
829 tiered.send(b"drain me").await.unwrap();
831 tiered.send(b"drain me too").await.unwrap();
832 let bytes_after_spool = tiered.spool_bytes();
833 assert!(bytes_after_spool > 0);
834
835 tiered.inner().set_available(true);
837 tokio::time::sleep(std::time::Duration::from_millis(300)).await;
838
839 assert_eq!(tiered.spool_bytes(), 0);
841 assert!(tiered.spool_is_empty().await);
842
843 tiered.shutdown().await;
844 }
845
846 #[tokio::test]
847 async fn test_spool_count_initialised_from_existing_queue() {
848 let dir = tempdir().unwrap();
849 let spool_path = dir.path().join("test-init-count");
850
851 {
853 let sink = TestSink::new();
854 sink.set_available(false);
855
856 let mut config = TieredSinkConfig::new(&spool_path);
857 config.circuit_failure_threshold = 1;
858
859 let tiered = TieredSink::new(sink, config).await.unwrap();
860
861 tiered.send(b"message 1").await.unwrap();
862 tiered.send(b"message 2").await.unwrap();
863 tiered.send(b"message 3").await.unwrap();
864 assert_eq!(tiered.spool_len().await, 3);
865
866 tiered.shutdown().await;
867 }
868
869 {
871 let sink = TestSink::new();
872 let config = TieredSinkConfig::new(&spool_path);
873 let tiered = TieredSink::new(sink, config).await.unwrap();
874
875 assert_eq!(tiered.spool_len().await, 3);
876 assert!(tiered.spool_bytes() > 0);
877
878 tiered.shutdown().await;
879 }
880 }
881
882 #[tokio::test]
883 async fn test_disk_available_flag() {
884 let dir = tempdir().unwrap();
885 let spool_path = dir.path().join("test-disk-flag");
886
887 let sink = TestSink::new();
888 sink.set_available(false);
889
890 let mut config = TieredSinkConfig::new(&spool_path);
891 config.circuit_failure_threshold = 1;
892
893 let tiered = TieredSink::new(sink, config).await.unwrap();
894
895 assert!(tiered.is_disk_available());
897
898 tiered.disk_available.store(false, AtomicOrdering::Relaxed);
900
901 let result = tiered.send(b"should fail").await;
902 assert!(matches!(result, Err(TieredSinkError::DiskUnavailable)));
903
904 tiered.shutdown().await;
905 }
906
907 #[tokio::test]
910 async fn test_max_spool_items_not_overshooting_under_concurrency() {
911 let dir = tempdir().unwrap();
912 let spool_path = dir.path().join("test-toctou-items");
913
914 let sink = TestSink::new();
915 sink.set_available(false);
916
917 let mut config = TieredSinkConfig::new(&spool_path);
918 config.circuit_failure_threshold = 1;
919 config.max_spool_items = Some(10);
920
921 let tiered = Arc::new(TieredSink::new(sink, config).await.unwrap());
922
923 let mut joins = Vec::new();
925 for _ in 0..100 {
926 let t = Arc::clone(&tiered);
927 joins.push(tokio::spawn(async move { t.send(b"contention").await }));
928 }
929
930 let mut accepted = 0usize;
931 let mut rejected = 0usize;
932 for j in joins {
933 match j.await.unwrap() {
934 Ok(()) => accepted += 1,
935 Err(TieredSinkError::SpoolFull(_)) => rejected += 1,
936 Err(e) => panic!("unexpected error: {e}"),
937 }
938 }
939
940 assert_eq!(accepted, 10, "cap must not overshoot (got {accepted})");
941 assert_eq!(rejected, 90);
942 assert_eq!(tiered.spool_len().await, 10);
943
944 let tiered = Arc::try_unwrap(tiered)
945 .map_err(|_| "outstanding Arc refs")
946 .unwrap();
947 tiered.shutdown().await;
948 }
949}