1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
//! Write stall controller for backpressure management.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Notify;
use crate::error::{Error, Result, WriteStallReason};
/// Current counts of resources that can trigger write stalls.
#[derive(Debug, Clone, Copy)]
pub struct StallCounts {
/// Number of immutable memtables queued for flush
pub immutable_memtables: usize,
/// Number of L0 SSTable files awaiting compaction
pub l0_files: usize,
}
/// Thresholds that trigger write stalls when exceeded.
#[derive(Debug, Clone, Copy)]
pub struct StallThresholds {
/// Maximum immutable memtable count before stalling writes
pub memtable_limit: usize,
/// Maximum L0 file count before stalling writes
pub l0_file_limit: usize,
}
/// Trait for getting current stall condition counts.
/// Implementors provide live resource counts that the controller
/// checks against its configured thresholds.
pub trait WriteStallCountProvider: Send + Sync + 'static {
/// Get current immutable memtable count and L0 file count.
fn get_stall_counts(&self) -> StallCounts;
}
/// Information about a write stall event.
/// Fields are used for diagnostics, testing, and potential future logging/metrics.
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct WriteStallInfo {
/// The reason for the stall
pub reason: WriteStallReason,
/// The current value that triggered the stall
pub current_value: usize,
/// The threshold that was exceeded
pub threshold: usize,
/// Duration spent stalled
pub duration: Duration,
}
/// Controller that manages write stall state and signaling.
///
/// Owns its count provider and thresholds, providing a self-contained
/// API for the commit path to check backpressure conditions.
pub struct WriteStallController {
/// Notification for when stall conditions clear
stall_cleared: Notify,
/// Current stall state (for fast-path check)
is_stalled: AtomicBool,
/// Shutdown flag - checked in stall loop for graceful exit
shutdown: AtomicBool,
/// Provider for live stall count readings
provider: Arc<dyn WriteStallCountProvider>,
/// Static thresholds configured at startup
thresholds: StallThresholds,
}
impl WriteStallController {
pub fn new(provider: Arc<dyn WriteStallCountProvider>, thresholds: StallThresholds) -> Self {
Self {
stall_cleared: Notify::new(),
is_stalled: AtomicBool::new(false),
shutdown: AtomicBool::new(false),
provider,
thresholds,
}
}
/// Check stall conditions and wait if stalled. Called before each write.
///
/// Returns `Ok(Some(WriteStallInfo))` if was stalled and then cleared,
/// `Ok(None)` if not stalled, `Err(Error::PipelineStall)` on shutdown.
///
/// Re-reads counts each iteration because background work (flushes,
/// compactions) may complete while waiting.
///
/// IMPORTANT: The `Notified` future is created at the start of each loop
/// iteration, BEFORE checking conditions. This ensures we receive any
/// `notify_waiters()` calls that happen after we register but before we
/// check. Per tokio docs: "The Notified future is guaranteed to receive
/// wakeups from notify_waiters() as soon as it has been created."
pub async fn check(&self) -> Result<Option<WriteStallInfo>> {
let mut stall_start: Option<Instant> = None;
let mut stall_reason: Option<WriteStallReason> = None;
let mut stall_value: usize = 0;
let mut stall_threshold: usize = 0;
loop {
// Create Notified FIRST to register for wakeups.
// Any notify_waiters() call after this point will wake us.
let notified = self.stall_cleared.notified();
// Check shutdown
if self.shutdown.load(Ordering::Acquire) {
if stall_reason.is_some() {
self.is_stalled.store(false, Ordering::Release);
}
return Err(Error::PipelineStall);
}
// Re-read counts (now any notify_waiters() after notified creation will wake us)
let counts = self.provider.get_stall_counts();
// Check if NOT stalled - return without awaiting
if counts.immutable_memtables < self.thresholds.memtable_limit
&& counts.l0_files < self.thresholds.l0_file_limit
{
// Not stalled - return result
if let Some(reason) = stall_reason {
self.is_stalled.store(false, Ordering::Release);
let duration = stall_start.map(|s| s.elapsed()).unwrap_or(Duration::ZERO);
log::info!("Write stall cleared after {:?}", duration);
return Ok(Some(WriteStallInfo {
reason,
current_value: stall_value,
threshold: stall_threshold,
duration,
}));
}
return Ok(None);
}
// Stalled - determine which condition triggered it
let (reason, value, threshold) =
if counts.immutable_memtables >= self.thresholds.memtable_limit {
(
WriteStallReason::MemtableLimit,
counts.immutable_memtables,
self.thresholds.memtable_limit,
)
} else {
(WriteStallReason::L0FileLimit, counts.l0_files, self.thresholds.l0_file_limit)
};
// Record stall reason if first time
if stall_reason.is_none() {
stall_reason = Some(reason);
stall_value = value;
stall_threshold = threshold;
stall_start = Some(Instant::now());
self.is_stalled.store(true, Ordering::Release);
log::warn!("Write stall: {:?} ({} >= {})", reason, value, threshold);
}
// Wait
notified.await;
}
}
/// Non-blocking check of whether stall conditions are currently met.
#[allow(dead_code)]
pub fn should_stall(&self) -> bool {
let counts = self.provider.get_stall_counts();
counts.immutable_memtables >= self.thresholds.memtable_limit
|| counts.l0_files >= self.thresholds.l0_file_limit
}
/// Get current counts from the provider (for determining stall reason).
#[allow(dead_code)]
pub fn provider_counts(&self) -> StallCounts {
self.provider.get_stall_counts()
}
/// Get the configured memtable stall limit.
#[allow(dead_code)]
pub fn memtable_limit(&self) -> usize {
self.thresholds.memtable_limit
}
/// Signal that stall conditions may have changed.
/// Called after flush or compaction completes.
pub fn signal_work_done(&self) {
self.stall_cleared.notify_waiters();
}
/// Signal shutdown - wakes all stalled writers to exit.
pub fn signal_shutdown(&self) {
self.shutdown.store(true, Ordering::Release);
self.stall_cleared.notify_waiters();
}
/// Fast check if currently stalled (for metrics).
#[cfg(test)]
pub fn is_stalled(&self) -> bool {
self.is_stalled.load(Ordering::Acquire)
}
}