firq_core/api.rs
1use std::collections::HashMap;
2use std::fmt;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6/// Stable fairness key used to assign work to shards and DRR queues.
7#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
8pub struct TenantKey(u64);
9
10impl From<u64> for TenantKey {
11 fn from(value: u64) -> Self {
12 Self(value)
13 }
14}
15
16impl TenantKey {
17 /// Returns the raw tenant identifier.
18 pub fn as_u64(&self) -> u64 {
19 self.0
20 }
21}
22
23/// Scheduling priority for a task.
24#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Default)]
25pub enum Priority {
26 /// Highest scheduling priority.
27 High,
28 /// Default scheduling priority.
29 #[default]
30 Normal,
31 /// Lowest scheduling priority.
32 Low,
33}
34
35impl Priority {
36 /// Returns a fixed index for this priority (High=0, Normal=1, Low=2).
37 pub fn index(self) -> usize {
38 match self {
39 Priority::High => 0,
40 Priority::Normal => 1,
41 Priority::Low => 2,
42 }
43 }
44
45 /// Returns priorities in dequeue order.
46 pub fn ordered() -> [Priority; 3] {
47 [Priority::High, Priority::Normal, Priority::Low]
48 }
49}
50
51/// Enqueued unit of work.
52#[derive(Clone, Debug)]
53pub struct Task<T> {
54 /// User payload.
55 pub payload: T,
56 /// Enqueue timestamp used to compute queue time metrics.
57 pub enqueue_ts: Instant,
58 /// Optional deadline for expiration before execution.
59 pub deadline: Option<Instant>,
60 /// Priority queue used for dispatch ordering.
61 pub priority: Priority,
62 /// Cost consumed by DRR when this task is dequeued.
63 pub cost: u64,
64}
65
66/// Dynamic per-tenant quantum provider.
67pub type QuantumProvider = Arc<dyn Fn(TenantKey) -> u64 + Send + Sync>;
68
69/// Scheduler runtime configuration.
70#[derive(Clone)]
71pub struct SchedulerConfig {
72 /// Number of shards used to partition tenant state.
73 pub shards: usize,
74 /// Global queue capacity across all tenants.
75 pub max_global: usize,
76 /// Maximum pending live tasks per tenant.
77 ///
78 /// Cancelled/expired entries are compacted lazily and are not intended to
79 /// consume this limit once reclaimed.
80 pub max_per_tenant: usize,
81 /// Base DRR quantum for tenants without overrides.
82 pub quantum: u64,
83 /// Static per-tenant DRR quantum overrides.
84 pub quantum_by_tenant: HashMap<TenantKey, u64>,
85 /// Dynamic per-tenant DRR quantum provider.
86 pub quantum_provider: Option<QuantumProvider>,
87 /// Default backpressure policy.
88 pub backpressure: BackpressurePolicy,
89 /// Per-tenant backpressure policy overrides.
90 pub backpressure_by_tenant: HashMap<TenantKey, BackpressurePolicy>,
91 /// Maximum number of tenants tracked in top-talker metrics.
92 pub top_tenants_capacity: usize,
93}
94
95impl fmt::Debug for SchedulerConfig {
96 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97 f.debug_struct("SchedulerConfig")
98 .field("shards", &self.shards)
99 .field("max_global", &self.max_global)
100 .field("max_per_tenant", &self.max_per_tenant)
101 .field("quantum", &self.quantum)
102 .field("quantum_by_tenant", &self.quantum_by_tenant)
103 .field(
104 "quantum_provider",
105 &self.quantum_provider.as_ref().map(|_| "<fn>"),
106 )
107 .field("backpressure", &self.backpressure)
108 .field("backpressure_by_tenant", &self.backpressure_by_tenant)
109 .field("top_tenants_capacity", &self.top_tenants_capacity)
110 .finish()
111 }
112}
113
114impl Default for SchedulerConfig {
115 fn default() -> Self {
116 Self {
117 shards: 1,
118 max_global: 1000,
119 max_per_tenant: 100,
120 quantum: 10,
121 quantum_by_tenant: HashMap::new(),
122 quantum_provider: None,
123 backpressure: BackpressurePolicy::Reject,
124 backpressure_by_tenant: HashMap::new(),
125 top_tenants_capacity: 10,
126 }
127 }
128}
129
130/// Behavior when enqueue capacity limits are hit.
131#[derive(Clone, Debug)]
132pub enum BackpressurePolicy {
133 /// Reject enqueue requests when limits are exceeded.
134 Reject,
135 /// Drop the oldest task from the same tenant and enqueue the new task.
136 DropOldestPerTenant,
137 /// Drop the newest task from the same tenant and enqueue the new task.
138 DropNewestPerTenant,
139 /// Wait for capacity up to `wait`, otherwise reject with timeout.
140 Timeout { wait: Duration },
141}
142
143/// Result of an enqueue operation.
144#[derive(Clone, Debug)]
145pub enum EnqueueResult {
146 /// Task was accepted.
147 Enqueued,
148 /// Task was rejected by backpressure policy.
149 Rejected(EnqueueRejectReason),
150 /// Scheduler is closed.
151 Closed,
152}
153
154/// Opaque identifier for a pending task.
155#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
156pub struct TaskHandle(u64);
157
158impl TaskHandle {
159 /// Returns the raw numeric handle.
160 pub fn as_u64(&self) -> u64 {
161 self.0
162 }
163}
164
165impl From<u64> for TaskHandle {
166 fn from(value: u64) -> Self {
167 Self(value)
168 }
169}
170
171/// Result of `enqueue_with_handle`.
172#[derive(Clone, Debug)]
173pub enum EnqueueWithHandleResult {
174 /// Task was accepted and assigned a cancel handle.
175 Enqueued(TaskHandle),
176 /// Task was rejected by backpressure policy.
177 Rejected(EnqueueRejectReason),
178 /// Scheduler is closed.
179 Closed,
180}
181
182/// Reason for enqueue rejection.
183#[derive(Clone, Debug)]
184pub enum EnqueueRejectReason {
185 /// Rejected due to global queue capacity.
186 GlobalFull,
187 /// Rejected due to per-tenant queue capacity.
188 TenantFull,
189 /// Rejected because timeout elapsed while waiting for capacity.
190 Timeout,
191}
192
193/// Scheduler shutdown mode.
194#[derive(Copy, Clone, Debug, PartialEq, Eq)]
195pub enum CloseMode {
196 /// Stop accepting and wake blocked consumers immediately.
197 Immediate,
198 /// Stop accepting and drain queued tasks before closing.
199 Drain,
200}
201
202/// Result of task cancellation.
203#[derive(Copy, Clone, Debug, PartialEq, Eq)]
204pub enum CancelResult {
205 /// Task was found and cancelled.
206 Cancelled,
207 /// Task handle was not found.
208 NotFound,
209}
210
211/// Result of a dequeue attempt.
212#[derive(Clone, Debug)]
213pub enum DequeueResult<T> {
214 /// Returned task with its tenant identity.
215 Task { tenant: TenantKey, task: Task<T> },
216 /// No task available at the time of dequeue.
217 Empty,
218 /// Scheduler has been closed.
219 Closed,
220}
221
222/// Snapshot of scheduler metrics.
223#[derive(Clone, Debug, Default)]
224pub struct SchedulerStats {
225 /// Total accepted enqueues.
226 pub enqueued: u64,
227 /// Total successfully dequeued tasks.
228 pub dequeued: u64,
229 /// Total tasks dropped because deadlines expired before dispatch.
230 pub expired: u64,
231 /// Total tasks dropped by reject/drop backpressure paths.
232 pub dropped: u64,
233 /// Total enqueue rejections due to global capacity.
234 pub rejected_global: u64,
235 /// Total enqueue rejections due to per-tenant capacity.
236 pub rejected_tenant: u64,
237 /// Total enqueue rejections caused by timeout.
238 pub timeout_rejected: u64,
239 /// Total tasks dropped by replacement policies.
240 pub dropped_policy: u64,
241 /// Current estimated live queue length.
242 pub queue_len_estimate: u64,
243 /// Configured global queue capacity.
244 pub max_global: u64,
245 /// `queue_len_estimate / max_global`.
246 pub queue_saturation_ratio: f64,
247 /// Sum of queue time in nanoseconds for dequeued tasks.
248 pub queue_time_sum_ns: u64,
249 /// Number of queue time samples.
250 pub queue_time_samples: u64,
251 /// p95 queue time estimate in nanoseconds.
252 pub queue_time_p95_ns: u64,
253 /// p99 queue time estimate in nanoseconds.
254 pub queue_time_p99_ns: u64,
255 /// Histogram buckets for queue time.
256 pub queue_time_histogram: Vec<QueueTimeBucket>,
257 /// Top talkers by recent observed volume.
258 pub top_tenants: Vec<TenantCount>,
259}
260
261/// Single histogram bucket for queue time metrics.
262#[derive(Clone, Debug, Default)]
263pub struct QueueTimeBucket {
264 /// Upper bound of bucket in nanoseconds.
265 pub le_ns: u64,
266 /// Number of observations in this bucket.
267 pub count: u64,
268}
269
270/// Tenant counter entry used in top-talker snapshots.
271#[derive(Clone, Debug)]
272pub struct TenantCount {
273 /// Tenant identifier.
274 pub tenant: TenantKey,
275 /// Observed count for this tenant.
276 pub count: u64,
277}