Skip to main content

firq_core/
api.rs

1use std::collections::HashMap;
2use std::fmt;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6/// Stable fairness key used to assign work to shards and DRR queues.
7#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
8pub struct TenantKey(u64);
9
10impl From<u64> for TenantKey {
11    fn from(value: u64) -> Self {
12        Self(value)
13    }
14}
15
16impl TenantKey {
17    /// Returns the raw tenant identifier.
18    pub fn as_u64(&self) -> u64 {
19        self.0
20    }
21}
22
23/// Scheduling priority for a task.
24#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Default)]
25pub enum Priority {
26    /// Highest scheduling priority.
27    High,
28    /// Default scheduling priority.
29    #[default]
30    Normal,
31    /// Lowest scheduling priority.
32    Low,
33}
34
35impl Priority {
36    /// Returns a fixed index for this priority (High=0, Normal=1, Low=2).
37    pub fn index(self) -> usize {
38        match self {
39            Priority::High => 0,
40            Priority::Normal => 1,
41            Priority::Low => 2,
42        }
43    }
44
45    /// Returns priorities in dequeue order.
46    pub fn ordered() -> [Priority; 3] {
47        [Priority::High, Priority::Normal, Priority::Low]
48    }
49}
50
51/// Enqueued unit of work.
52#[derive(Clone, Debug)]
53pub struct Task<T> {
54    /// User payload.
55    pub payload: T,
56    /// Enqueue timestamp used to compute queue time metrics.
57    pub enqueue_ts: Instant,
58    /// Optional deadline for expiration before execution.
59    pub deadline: Option<Instant>,
60    /// Priority queue used for dispatch ordering.
61    pub priority: Priority,
62    /// Cost consumed by DRR when this task is dequeued.
63    pub cost: u64,
64}
65
66/// Dynamic per-tenant quantum provider.
67pub type QuantumProvider = Arc<dyn Fn(TenantKey) -> u64 + Send + Sync>;
68
69/// Scheduler runtime configuration.
70#[derive(Clone)]
71pub struct SchedulerConfig {
72    /// Number of shards used to partition tenant state.
73    pub shards: usize,
74    /// Global queue capacity across all tenants.
75    pub max_global: usize,
76    /// Maximum pending live tasks per tenant.
77    ///
78    /// Cancelled/expired entries are compacted lazily and are not intended to
79    /// consume this limit once reclaimed.
80    pub max_per_tenant: usize,
81    /// Base DRR quantum for tenants without overrides.
82    pub quantum: u64,
83    /// Static per-tenant DRR quantum overrides.
84    pub quantum_by_tenant: HashMap<TenantKey, u64>,
85    /// Dynamic per-tenant DRR quantum provider.
86    pub quantum_provider: Option<QuantumProvider>,
87    /// Default backpressure policy.
88    pub backpressure: BackpressurePolicy,
89    /// Per-tenant backpressure policy overrides.
90    pub backpressure_by_tenant: HashMap<TenantKey, BackpressurePolicy>,
91    /// Maximum number of tenants tracked in top-talker metrics.
92    pub top_tenants_capacity: usize,
93}
94
95impl fmt::Debug for SchedulerConfig {
96    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97        f.debug_struct("SchedulerConfig")
98            .field("shards", &self.shards)
99            .field("max_global", &self.max_global)
100            .field("max_per_tenant", &self.max_per_tenant)
101            .field("quantum", &self.quantum)
102            .field("quantum_by_tenant", &self.quantum_by_tenant)
103            .field(
104                "quantum_provider",
105                &self.quantum_provider.as_ref().map(|_| "<fn>"),
106            )
107            .field("backpressure", &self.backpressure)
108            .field("backpressure_by_tenant", &self.backpressure_by_tenant)
109            .field("top_tenants_capacity", &self.top_tenants_capacity)
110            .finish()
111    }
112}
113
114impl Default for SchedulerConfig {
115    fn default() -> Self {
116        Self {
117            shards: 1,
118            max_global: 1000,
119            max_per_tenant: 100,
120            quantum: 10,
121            quantum_by_tenant: HashMap::new(),
122            quantum_provider: None,
123            backpressure: BackpressurePolicy::Reject,
124            backpressure_by_tenant: HashMap::new(),
125            top_tenants_capacity: 10,
126        }
127    }
128}
129
130/// Behavior when enqueue capacity limits are hit.
131#[derive(Clone, Debug)]
132pub enum BackpressurePolicy {
133    /// Reject enqueue requests when limits are exceeded.
134    Reject,
135    /// Drop the oldest task from the same tenant and enqueue the new task.
136    DropOldestPerTenant,
137    /// Drop the newest task from the same tenant and enqueue the new task.
138    DropNewestPerTenant,
139    /// Wait for capacity up to `wait`, otherwise reject with timeout.
140    Timeout { wait: Duration },
141}
142
143/// Result of an enqueue operation.
144#[derive(Clone, Debug)]
145pub enum EnqueueResult {
146    /// Task was accepted.
147    Enqueued,
148    /// Task was rejected by backpressure policy.
149    Rejected(EnqueueRejectReason),
150    /// Scheduler is closed.
151    Closed,
152}
153
154/// Opaque identifier for a pending task.
155#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
156pub struct TaskHandle(u64);
157
158impl TaskHandle {
159    /// Returns the raw numeric handle.
160    pub fn as_u64(&self) -> u64 {
161        self.0
162    }
163}
164
165impl From<u64> for TaskHandle {
166    fn from(value: u64) -> Self {
167        Self(value)
168    }
169}
170
171/// Result of `enqueue_with_handle`.
172#[derive(Clone, Debug)]
173pub enum EnqueueWithHandleResult {
174    /// Task was accepted and assigned a cancel handle.
175    Enqueued(TaskHandle),
176    /// Task was rejected by backpressure policy.
177    Rejected(EnqueueRejectReason),
178    /// Scheduler is closed.
179    Closed,
180}
181
182/// Reason for enqueue rejection.
183#[derive(Clone, Debug)]
184pub enum EnqueueRejectReason {
185    /// Rejected due to global queue capacity.
186    GlobalFull,
187    /// Rejected due to per-tenant queue capacity.
188    TenantFull,
189    /// Rejected because timeout elapsed while waiting for capacity.
190    Timeout,
191}
192
193/// Scheduler shutdown mode.
194#[derive(Copy, Clone, Debug, PartialEq, Eq)]
195pub enum CloseMode {
196    /// Stop accepting and wake blocked consumers immediately.
197    Immediate,
198    /// Stop accepting and drain queued tasks before closing.
199    Drain,
200}
201
202/// Result of task cancellation.
203#[derive(Copy, Clone, Debug, PartialEq, Eq)]
204pub enum CancelResult {
205    /// Task was found and cancelled.
206    Cancelled,
207    /// Task handle was not found.
208    NotFound,
209}
210
211/// Result of a dequeue attempt.
212#[derive(Clone, Debug)]
213pub enum DequeueResult<T> {
214    /// Returned task with its tenant identity.
215    Task { tenant: TenantKey, task: Task<T> },
216    /// No task available at the time of dequeue.
217    Empty,
218    /// Scheduler has been closed.
219    Closed,
220}
221
222/// Snapshot of scheduler metrics.
223#[derive(Clone, Debug, Default)]
224pub struct SchedulerStats {
225    /// Total accepted enqueues.
226    pub enqueued: u64,
227    /// Total successfully dequeued tasks.
228    pub dequeued: u64,
229    /// Total tasks dropped because deadlines expired before dispatch.
230    pub expired: u64,
231    /// Total tasks dropped by reject/drop backpressure paths.
232    pub dropped: u64,
233    /// Total enqueue rejections due to global capacity.
234    pub rejected_global: u64,
235    /// Total enqueue rejections due to per-tenant capacity.
236    pub rejected_tenant: u64,
237    /// Total enqueue rejections caused by timeout.
238    pub timeout_rejected: u64,
239    /// Total tasks dropped by replacement policies.
240    pub dropped_policy: u64,
241    /// Current estimated live queue length.
242    pub queue_len_estimate: u64,
243    /// Configured global queue capacity.
244    pub max_global: u64,
245    /// `queue_len_estimate / max_global`.
246    pub queue_saturation_ratio: f64,
247    /// Sum of queue time in nanoseconds for dequeued tasks.
248    pub queue_time_sum_ns: u64,
249    /// Number of queue time samples.
250    pub queue_time_samples: u64,
251    /// p95 queue time estimate in nanoseconds.
252    pub queue_time_p95_ns: u64,
253    /// p99 queue time estimate in nanoseconds.
254    pub queue_time_p99_ns: u64,
255    /// Histogram buckets for queue time.
256    pub queue_time_histogram: Vec<QueueTimeBucket>,
257    /// Top talkers by recent observed volume.
258    pub top_tenants: Vec<TenantCount>,
259}
260
261/// Single histogram bucket for queue time metrics.
262#[derive(Clone, Debug, Default)]
263pub struct QueueTimeBucket {
264    /// Upper bound of bucket in nanoseconds.
265    pub le_ns: u64,
266    /// Number of observations in this bucket.
267    pub count: u64,
268}
269
270/// Tenant counter entry used in top-talker snapshots.
271#[derive(Clone, Debug)]
272pub struct TenantCount {
273    /// Tenant identifier.
274    pub tenant: TenantKey,
275    /// Observed count for this tenant.
276    pub count: u64,
277}