1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
//! Deployment selection logic
//!
//! This module contains the core routing logic for selecting
//! the best deployment for a given model.
use super::config::RoutingStrategy;
use super::deployment::{Deployment, DeploymentId};
use super::error::RouterError;
use super::strategy_impl;
use super::unified::Router;
use std::sync::atomic::Ordering::Relaxed;
impl Router {
/// Check if deployment is within parallel request limit
pub(crate) fn check_parallel_limit(&self, deployment: &Deployment) -> bool {
match deployment.config.max_parallel_requests {
Some(limit) => deployment.state.active_requests.load(Relaxed) < limit,
None => true,
}
}
/// Check if deployment is within rate limits (TPM/RPM)
pub(crate) fn check_rate_limit(&self, deployment: &Deployment) -> bool {
let rpm_ok = match deployment.config.rpm_limit {
Some(limit) => deployment.state.rpm_current.load(Relaxed) < limit,
None => true,
};
let tpm_ok = match deployment.config.tpm_limit {
Some(limit) => deployment.state.tpm_current.load(Relaxed) < limit,
None => true,
};
rpm_ok && tpm_ok
}
/// Select the best deployment for a given model (core routing method)
///
/// # Flow
///
/// 1. Resolve model_name (handle aliases)
/// 2. Get all deployment IDs for this model
/// 3. Filter: healthy + not in cooldown + not rate limited
/// 4. Select based on routing strategy
/// 5. Increment active_requests counter
pub fn select_deployment(&self, model_name: &str) -> Result<DeploymentId, RouterError> {
// 1. Resolve model name (handle aliases)
let resolved_name = self.resolve_model_name(model_name);
// 2. Get all deployment IDs for this model (hold DashMap guard, avoid Vec clone)
let deployment_ids_ref = self
.model_index
.get(&resolved_name)
.ok_or_else(|| RouterError::ModelNotFound(model_name.to_string()))?;
if deployment_ids_ref.is_empty() {
return Err(RouterError::ModelNotFound(model_name.to_string()));
}
// 3. Filter: healthy + not in cooldown + not rate limited
let total_deployments = deployment_ids_ref.len();
let candidate_ids: Vec<DeploymentId> = deployment_ids_ref
.iter()
.filter(|id| {
if let Some(deployment) = self.deployments.get(id.as_str()) {
// Check cooldown first: is_in_cooldown() resets health
// from Cooldown to Degraded when the cooldown period expires.
if deployment.is_in_cooldown() {
tracing::trace!(
deployment_id = id.as_str(),
model = %resolved_name,
reason = "in_cooldown",
"deployment excluded from routing candidates"
);
return false;
}
if !deployment.is_healthy() {
tracing::trace!(
deployment_id = id.as_str(),
model = %resolved_name,
reason = "unhealthy",
"deployment excluded from routing candidates"
);
return false;
}
if !self.check_parallel_limit(&deployment) {
tracing::trace!(
deployment_id = id.as_str(),
model = %resolved_name,
reason = "parallel_limit_reached",
"deployment excluded from routing candidates"
);
return false;
}
if !self.check_rate_limit(&deployment) {
tracing::trace!(
deployment_id = id.as_str(),
model = %resolved_name,
reason = "rate_limited",
"deployment excluded from routing candidates"
);
return false;
}
true
} else {
false
}
})
.cloned()
.collect();
// Drop the model_index read guard before strategy selection
drop(deployment_ids_ref);
if candidate_ids.is_empty() {
tracing::warn!(
model = %model_name,
total_deployments = total_deployments,
"no available deployments after filtering"
);
return Err(RouterError::NoAvailableDeployment(model_name.to_string()));
}
// 4. Build immutable routing context once, then let strategies read that snapshot.
let routing_contexts =
strategy_impl::build_routing_contexts(&candidate_ids, &self.deployments);
// 5. Select based on routing strategy
// Note: candidate_ids is guaranteed non-empty at this point (checked above)
let selected_id = match self.config.routing_strategy {
RoutingStrategy::SimpleShuffle => {
strategy_impl::weighted_random_from_context(&routing_contexts)
}
RoutingStrategy::LeastBusy => strategy_impl::least_busy_from_context(&routing_contexts),
RoutingStrategy::UsageBased => {
strategy_impl::lowest_usage_from_context(&routing_contexts)
}
RoutingStrategy::LatencyBased => {
strategy_impl::lowest_latency_from_context(&routing_contexts)
}
RoutingStrategy::PriorityBased => {
strategy_impl::lowest_priority_from_context(&routing_contexts)
}
RoutingStrategy::RateLimitAware => {
strategy_impl::rate_limit_aware_from_context(&routing_contexts)
}
RoutingStrategy::RoundRobin => strategy_impl::round_robin_from_context(
&resolved_name,
&routing_contexts,
&self.round_robin_counters,
),
}
.cloned()
.ok_or_else(|| RouterError::NoAvailableDeployment(model_name.to_string()))?;
// 6. Increment active_requests counter and routing metrics
if let Some(deployment) = self.deployments.get(&selected_id) {
deployment.state.active_requests.fetch_add(1, Relaxed);
}
self.provider_selected_count.fetch_add(1, Relaxed);
self.strategy_used_count.fetch_add(1, Relaxed);
tracing::debug!(
model = %model_name,
strategy = ?self.config.routing_strategy,
candidate_count = candidate_ids.len(),
selected_id = %selected_id,
"deployment selected for routing"
);
Ok(selected_id)
}
/// Release a deployment after request completion
///
/// Decrements the active_requests counter for the deployment.
pub fn release_deployment(&self, deployment_id: &str) {
if let Some(deployment) = self.deployments.get(deployment_id) {
let _ = deployment
.state
.active_requests
.fetch_update(Relaxed, Relaxed, |v| Some(v.saturating_sub(1)));
}
}
}