1use std::collections::{HashMap, VecDeque};
2use std::io::{Read, Seek, SeekFrom};
3use std::path::PathBuf;
4use std::sync::Arc;
5use std::sync::Mutex;
6use std::sync::OnceLock;
7use std::sync::atomic::{AtomicU64, Ordering};
8
9use serde::{Deserialize, Serialize};
10use serde_json::Value as JsonValue;
11use tokio::sync::RwLock;
12use tokio::time::{Duration, interval};
13
14use crate::lb::LbState;
15use crate::logging::RetryInfo;
16use crate::sessions;
17use crate::usage::UsageMetrics;
18
19fn recent_finished_max() -> usize {
20 static MAX: OnceLock<usize> = OnceLock::new();
21 *MAX.get_or_init(|| {
22 std::env::var("CODEX_HELPER_RECENT_FINISHED_MAX")
23 .ok()
24 .and_then(|s| s.trim().parse::<usize>().ok())
25 .filter(|&n| n > 0)
26 .unwrap_or(2_000)
27 .clamp(200, 20_000)
28 })
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
32pub struct UsageBucket {
33 pub requests_total: u64,
34 pub requests_error: u64,
35 pub duration_ms_total: u64,
36 pub requests_with_usage: u64,
37 pub duration_ms_with_usage_total: u64,
38 pub generation_ms_total: u64,
39 pub ttfb_ms_total: u64,
40 pub ttfb_samples: u64,
41 pub usage: UsageMetrics,
42}
43
44impl UsageBucket {
45 fn record(
46 &mut self,
47 status_code: u16,
48 duration_ms: u64,
49 usage: Option<&UsageMetrics>,
50 ttfb_ms: Option<u64>,
51 ) {
52 self.requests_total = self.requests_total.saturating_add(1);
53 if status_code >= 400 {
54 self.requests_error = self.requests_error.saturating_add(1);
55 }
56 self.duration_ms_total = self.duration_ms_total.saturating_add(duration_ms);
57 if let Some(u) = usage {
58 self.usage.add_assign(u);
59 self.requests_with_usage = self.requests_with_usage.saturating_add(1);
60 self.duration_ms_with_usage_total = self
61 .duration_ms_with_usage_total
62 .saturating_add(duration_ms);
63
64 let gen_ms = match ttfb_ms {
65 Some(ttfb) if ttfb > 0 && ttfb < duration_ms => duration_ms.saturating_sub(ttfb),
66 _ => duration_ms,
67 };
68 self.generation_ms_total = self.generation_ms_total.saturating_add(gen_ms);
69 if let Some(ttfb) = ttfb_ms.filter(|v| *v > 0) {
70 self.ttfb_ms_total = self.ttfb_ms_total.saturating_add(ttfb);
71 self.ttfb_samples = self.ttfb_samples.saturating_add(1);
72 }
73 }
74 }
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
78pub struct UsageRollupView {
79 pub since_start: UsageBucket,
80 pub by_day: Vec<(i32, UsageBucket)>,
81 pub by_config: Vec<(String, UsageBucket)>,
82 pub by_config_day: HashMap<String, Vec<(i32, UsageBucket)>>,
83 pub by_provider: Vec<(String, UsageBucket)>,
84 pub by_provider_day: HashMap<String, Vec<(i32, UsageBucket)>>,
85}
86
87#[derive(Debug, Clone, Default)]
88struct UsageRollup {
89 since_start: UsageBucket,
90 by_day: HashMap<i32, UsageBucket>,
91 by_config: HashMap<String, UsageBucket>,
92 by_config_day: HashMap<String, HashMap<i32, UsageBucket>>,
93 by_provider: HashMap<String, UsageBucket>,
94 by_provider_day: HashMap<String, HashMap<i32, UsageBucket>>,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
98pub struct UpstreamHealth {
99 pub base_url: String,
100 #[serde(skip_serializing_if = "Option::is_none")]
101 pub ok: Option<bool>,
102 #[serde(skip_serializing_if = "Option::is_none")]
103 pub status_code: Option<u16>,
104 #[serde(skip_serializing_if = "Option::is_none")]
105 pub latency_ms: Option<u64>,
106 #[serde(skip_serializing_if = "Option::is_none")]
107 pub error: Option<String>,
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
111pub struct ConfigHealth {
112 pub checked_at_ms: u64,
113 #[serde(default)]
114 pub upstreams: Vec<UpstreamHealth>,
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
118pub struct LbUpstreamView {
119 pub failure_count: u32,
120 pub cooldown_remaining_secs: Option<u64>,
121 pub usage_exhausted: bool,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
125pub struct LbConfigView {
126 pub last_good_index: Option<usize>,
127 pub upstreams: Vec<LbUpstreamView>,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
131pub struct HealthCheckStatus {
132 pub started_at_ms: u64,
133 pub updated_at_ms: u64,
134 pub total: u32,
135 pub completed: u32,
136 pub ok: u32,
137 pub err: u32,
138 pub cancel_requested: bool,
139 pub canceled: bool,
140 pub done: bool,
141 #[serde(skip_serializing_if = "Option::is_none")]
142 pub last_error: Option<String>,
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
146pub struct ActiveRequest {
147 pub id: u64,
148 #[serde(skip_serializing_if = "Option::is_none")]
149 pub session_id: Option<String>,
150 #[serde(skip_serializing_if = "Option::is_none")]
151 pub cwd: Option<String>,
152 #[serde(skip_serializing_if = "Option::is_none")]
153 pub model: Option<String>,
154 #[serde(skip_serializing_if = "Option::is_none")]
155 pub reasoning_effort: Option<String>,
156 #[serde(skip_serializing_if = "Option::is_none")]
157 pub config_name: Option<String>,
158 #[serde(skip_serializing_if = "Option::is_none")]
159 pub provider_id: Option<String>,
160 #[serde(skip_serializing_if = "Option::is_none")]
161 pub upstream_base_url: Option<String>,
162 pub service: String,
163 pub method: String,
164 pub path: String,
165 pub started_at_ms: u64,
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
169pub struct FinishedRequest {
170 pub id: u64,
171 #[serde(skip_serializing_if = "Option::is_none")]
172 pub session_id: Option<String>,
173 #[serde(skip_serializing_if = "Option::is_none")]
174 pub cwd: Option<String>,
175 #[serde(skip_serializing_if = "Option::is_none")]
176 pub model: Option<String>,
177 #[serde(skip_serializing_if = "Option::is_none")]
178 pub reasoning_effort: Option<String>,
179 #[serde(skip_serializing_if = "Option::is_none")]
180 pub config_name: Option<String>,
181 #[serde(skip_serializing_if = "Option::is_none")]
182 pub provider_id: Option<String>,
183 #[serde(skip_serializing_if = "Option::is_none")]
184 pub upstream_base_url: Option<String>,
185 #[serde(skip_serializing_if = "Option::is_none")]
186 pub usage: Option<UsageMetrics>,
187 #[serde(skip_serializing_if = "Option::is_none")]
188 pub retry: Option<RetryInfo>,
189 pub service: String,
190 pub method: String,
191 pub path: String,
192 pub status_code: u16,
193 pub duration_ms: u64,
194 #[serde(skip_serializing_if = "Option::is_none")]
195 pub ttfb_ms: Option<u64>,
196 pub ended_at_ms: u64,
197}
198
199#[derive(Debug, Clone)]
200pub struct FinishRequestParams {
201 pub id: u64,
202 pub status_code: u16,
203 pub duration_ms: u64,
204 pub ended_at_ms: u64,
205 pub usage: Option<UsageMetrics>,
206 pub retry: Option<RetryInfo>,
207 pub ttfb_ms: Option<u64>,
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
211pub struct SessionStats {
212 pub turns_total: u64,
213 #[serde(skip_serializing_if = "Option::is_none")]
214 pub last_model: Option<String>,
215 #[serde(skip_serializing_if = "Option::is_none")]
216 pub last_reasoning_effort: Option<String>,
217 #[serde(skip_serializing_if = "Option::is_none")]
218 pub last_provider_id: Option<String>,
219 #[serde(skip_serializing_if = "Option::is_none")]
220 pub last_config_name: Option<String>,
221 #[serde(skip_serializing_if = "Option::is_none")]
222 pub last_usage: Option<UsageMetrics>,
223 pub total_usage: UsageMetrics,
224 pub turns_with_usage: u64,
225 #[serde(skip_serializing_if = "Option::is_none")]
226 pub last_status: Option<u16>,
227 #[serde(skip_serializing_if = "Option::is_none")]
228 pub last_duration_ms: Option<u64>,
229 #[serde(skip_serializing_if = "Option::is_none")]
230 pub last_ended_at_ms: Option<u64>,
231 pub last_seen_ms: u64,
232}
233
234#[derive(Debug, Clone)]
235struct SessionEffortOverride {
236 effort: String,
237 #[allow(dead_code)]
238 updated_at_ms: u64,
239 last_seen_ms: u64,
240}
241
242#[derive(Debug, Clone)]
243struct SessionConfigOverride {
244 config_name: String,
245 #[allow(dead_code)]
246 updated_at_ms: u64,
247 last_seen_ms: u64,
248}
249
250#[derive(Debug, Clone)]
251struct SessionCwdCacheEntry {
252 cwd: Option<String>,
253 last_seen_ms: u64,
254}
255
256#[derive(Debug, Clone, Default)]
257struct ConfigMetaOverride {
258 enabled: Option<bool>,
259 level: Option<u8>,
260 #[allow(dead_code)]
261 updated_at_ms: u64,
262}
263
264#[derive(Debug)]
268pub struct ProxyState {
269 next_request_id: AtomicU64,
270 session_override_ttl_ms: u64,
271 session_cwd_cache_ttl_ms: u64,
272 session_cwd_cache_max_entries: usize,
273 session_effort_overrides: RwLock<HashMap<String, SessionEffortOverride>>,
274 session_config_overrides: RwLock<HashMap<String, SessionConfigOverride>>,
275 global_config_override: RwLock<Option<String>>,
276 config_meta_overrides: RwLock<HashMap<String, HashMap<String, ConfigMetaOverride>>>,
277 session_cwd_cache: RwLock<HashMap<String, SessionCwdCacheEntry>>,
278 session_stats: RwLock<HashMap<String, SessionStats>>,
279 active_requests: RwLock<HashMap<u64, ActiveRequest>>,
280 recent_finished: RwLock<VecDeque<FinishedRequest>>,
281 usage_rollups: RwLock<HashMap<String, UsageRollup>>,
282 config_health: RwLock<HashMap<String, HashMap<String, ConfigHealth>>>,
283 health_checks: RwLock<HashMap<String, HashMap<String, HealthCheckStatus>>>,
284 lb_states: Option<Arc<Mutex<HashMap<String, LbState>>>>,
285}
286
287impl ProxyState {
288 const MAX_HEALTH_RECORDS_PER_CONFIG: usize = 200;
289
290 #[allow(dead_code)]
291 pub fn new() -> Arc<Self> {
292 Self::new_with_lb_states(None)
293 }
294
295 pub fn new_with_lb_states(
296 lb_states: Option<Arc<Mutex<HashMap<String, LbState>>>>,
297 ) -> Arc<Self> {
298 let ttl_secs = std::env::var("CODEX_HELPER_SESSION_OVERRIDE_TTL_SECS")
299 .ok()
300 .and_then(|s| s.trim().parse::<u64>().ok())
301 .filter(|&n| n > 0)
302 .unwrap_or(30 * 60);
303 let ttl_ms = ttl_secs.saturating_mul(1000);
304
305 let cwd_cache_ttl_secs = std::env::var("CODEX_HELPER_SESSION_CWD_CACHE_TTL_SECS")
306 .ok()
307 .and_then(|s| s.trim().parse::<u64>().ok())
308 .unwrap_or(12 * 60 * 60);
309 let cwd_cache_ttl_ms = cwd_cache_ttl_secs.saturating_mul(1000);
310 let cwd_cache_max_entries = std::env::var("CODEX_HELPER_SESSION_CWD_CACHE_MAX_ENTRIES")
311 .ok()
312 .and_then(|s| s.trim().parse::<usize>().ok())
313 .unwrap_or(2_000);
314
315 Arc::new(Self {
316 next_request_id: AtomicU64::new(1),
317 session_override_ttl_ms: ttl_ms,
318 session_cwd_cache_ttl_ms: cwd_cache_ttl_ms,
319 session_cwd_cache_max_entries: cwd_cache_max_entries,
320 session_effort_overrides: RwLock::new(HashMap::new()),
321 session_config_overrides: RwLock::new(HashMap::new()),
322 global_config_override: RwLock::new(None),
323 config_meta_overrides: RwLock::new(HashMap::new()),
324 session_cwd_cache: RwLock::new(HashMap::new()),
325 session_stats: RwLock::new(HashMap::new()),
326 active_requests: RwLock::new(HashMap::new()),
327 recent_finished: RwLock::new(VecDeque::new()),
328 usage_rollups: RwLock::new(HashMap::new()),
329 config_health: RwLock::new(HashMap::new()),
330 health_checks: RwLock::new(HashMap::new()),
331 lb_states,
332 })
333 }
334
335 pub async fn get_session_effort_override(&self, session_id: &str) -> Option<String> {
336 let guard = self.session_effort_overrides.read().await;
337 guard.get(session_id).map(|v| v.effort.clone())
338 }
339
340 pub async fn set_session_effort_override(
341 &self,
342 session_id: String,
343 effort: String,
344 now_ms: u64,
345 ) {
346 let mut guard = self.session_effort_overrides.write().await;
347 guard.insert(
348 session_id,
349 SessionEffortOverride {
350 effort,
351 updated_at_ms: now_ms,
352 last_seen_ms: now_ms,
353 },
354 );
355 }
356
357 pub async fn clear_session_effort_override(&self, session_id: &str) {
358 let mut guard = self.session_effort_overrides.write().await;
359 guard.remove(session_id);
360 }
361
362 pub async fn list_session_effort_overrides(&self) -> HashMap<String, String> {
363 let guard = self.session_effort_overrides.read().await;
364 guard
365 .iter()
366 .map(|(k, v)| (k.clone(), v.effort.clone()))
367 .collect()
368 }
369
370 pub async fn touch_session_override(&self, session_id: &str, now_ms: u64) {
371 let mut guard = self.session_effort_overrides.write().await;
372 if let Some(v) = guard.get_mut(session_id) {
373 v.last_seen_ms = now_ms;
374 }
375 }
376
377 pub async fn get_session_config_override(&self, session_id: &str) -> Option<String> {
378 let guard = self.session_config_overrides.read().await;
379 guard.get(session_id).map(|v| v.config_name.clone())
380 }
381
382 pub async fn set_session_config_override(
383 &self,
384 session_id: String,
385 config_name: String,
386 now_ms: u64,
387 ) {
388 let mut guard = self.session_config_overrides.write().await;
389 guard.insert(
390 session_id,
391 SessionConfigOverride {
392 config_name,
393 updated_at_ms: now_ms,
394 last_seen_ms: now_ms,
395 },
396 );
397 }
398
399 pub async fn clear_session_config_override(&self, session_id: &str) {
400 let mut guard = self.session_config_overrides.write().await;
401 guard.remove(session_id);
402 }
403
404 pub async fn list_session_config_overrides(&self) -> HashMap<String, String> {
405 let guard = self.session_config_overrides.read().await;
406 guard
407 .iter()
408 .map(|(k, v)| (k.clone(), v.config_name.clone()))
409 .collect()
410 }
411
412 pub async fn touch_session_config_override(&self, session_id: &str, now_ms: u64) {
413 let mut guard = self.session_config_overrides.write().await;
414 if let Some(v) = guard.get_mut(session_id) {
415 v.last_seen_ms = now_ms;
416 }
417 }
418
419 pub async fn get_global_config_override(&self) -> Option<String> {
420 let guard = self.global_config_override.read().await;
421 guard.clone()
422 }
423
424 pub async fn set_global_config_override(&self, config_name: String, _now_ms: u64) {
425 let mut guard = self.global_config_override.write().await;
426 *guard = Some(config_name);
427 }
428
429 pub async fn clear_global_config_override(&self) {
430 let mut guard = self.global_config_override.write().await;
431 *guard = None;
432 }
433
434 pub async fn set_config_enabled_override(
435 &self,
436 service_name: &str,
437 config_name: String,
438 enabled: bool,
439 now_ms: u64,
440 ) {
441 let mut guard = self.config_meta_overrides.write().await;
442 let per_service = guard.entry(service_name.to_string()).or_default();
443 let entry = per_service.entry(config_name).or_default();
444 entry.enabled = Some(enabled);
445 entry.updated_at_ms = now_ms;
446 }
447
448 pub async fn set_config_level_override(
449 &self,
450 service_name: &str,
451 config_name: String,
452 level: u8,
453 now_ms: u64,
454 ) {
455 let mut guard = self.config_meta_overrides.write().await;
456 let per_service = guard.entry(service_name.to_string()).or_default();
457 let entry = per_service.entry(config_name).or_default();
458 entry.level = Some(level.clamp(1, 10));
459 entry.updated_at_ms = now_ms;
460 }
461
462 pub async fn get_config_meta_overrides(
463 &self,
464 service_name: &str,
465 ) -> HashMap<String, (Option<bool>, Option<u8>)> {
466 let guard = self.config_meta_overrides.read().await;
467 guard
468 .get(service_name)
469 .map(|m| {
470 m.iter()
471 .map(|(k, v)| (k.clone(), (v.enabled, v.level)))
472 .collect::<HashMap<_, _>>()
473 })
474 .unwrap_or_default()
475 }
476
477 pub async fn record_config_health(
478 &self,
479 service_name: &str,
480 config_name: String,
481 health: ConfigHealth,
482 ) {
483 let mut guard = self.config_health.write().await;
484 let per_service = guard.entry(service_name.to_string()).or_default();
485 per_service.insert(config_name, health);
486 }
487
488 pub async fn get_config_health(&self, service_name: &str) -> HashMap<String, ConfigHealth> {
489 let guard = self.config_health.read().await;
490 guard.get(service_name).cloned().unwrap_or_default()
491 }
492
493 pub async fn get_lb_view(&self) -> HashMap<String, LbConfigView> {
494 let Some(lb_states) = self.lb_states.as_ref() else {
495 return HashMap::new();
496 };
497 let mut map = match lb_states.lock() {
498 Ok(m) => m,
499 Err(e) => e.into_inner(),
500 };
501
502 let now = std::time::Instant::now();
503 let mut out = HashMap::new();
504 for (cfg_name, st) in map.iter_mut() {
505 let len = st
506 .failure_counts
507 .len()
508 .max(st.cooldown_until.len())
509 .max(st.usage_exhausted.len());
510 if len == 0 {
511 continue;
512 }
513
514 if st.failure_counts.len() != len {
516 st.failure_counts.resize(len, 0);
517 }
518 if st.cooldown_until.len() != len {
519 st.cooldown_until.resize(len, None);
520 }
521 if st.usage_exhausted.len() != len {
522 st.usage_exhausted.resize(len, false);
523 }
524
525 let mut upstreams = Vec::with_capacity(len);
526 for idx in 0..len {
527 let failure_count = st.failure_counts.get(idx).copied().unwrap_or(0);
528 let cooldown_remaining_secs = st
529 .cooldown_until
530 .get(idx)
531 .and_then(|v| *v)
532 .map(|until| until.saturating_duration_since(now).as_secs())
533 .filter(|&s| s > 0);
534 let usage_exhausted = st.usage_exhausted.get(idx).copied().unwrap_or(false);
535 upstreams.push(LbUpstreamView {
536 failure_count,
537 cooldown_remaining_secs,
538 usage_exhausted,
539 });
540 }
541
542 out.insert(
543 cfg_name.clone(),
544 LbConfigView {
545 last_good_index: st.last_good_index,
546 upstreams,
547 },
548 );
549 }
550 out
551 }
552
553 pub async fn list_health_checks(
554 &self,
555 service_name: &str,
556 ) -> HashMap<String, HealthCheckStatus> {
557 let guard = self.health_checks.read().await;
558 guard.get(service_name).cloned().unwrap_or_default()
559 }
560
561 pub async fn try_begin_health_check(
562 &self,
563 service_name: &str,
564 config_name: &str,
565 total: usize,
566 now_ms: u64,
567 ) -> bool {
568 let mut guard = self.health_checks.write().await;
569 let per_service = guard.entry(service_name.to_string()).or_default();
570 if let Some(existing) = per_service.get(config_name)
571 && !existing.done
572 {
573 return false;
574 }
575 per_service.insert(
576 config_name.to_string(),
577 HealthCheckStatus {
578 started_at_ms: now_ms,
579 updated_at_ms: now_ms,
580 total: total.min(u32::MAX as usize) as u32,
581 completed: 0,
582 ok: 0,
583 err: 0,
584 cancel_requested: false,
585 canceled: false,
586 done: false,
587 last_error: None,
588 },
589 );
590 true
591 }
592
593 pub async fn request_cancel_health_check(
594 &self,
595 service_name: &str,
596 config_name: &str,
597 now_ms: u64,
598 ) -> bool {
599 let mut guard = self.health_checks.write().await;
600 let Some(per_service) = guard.get_mut(service_name) else {
601 return false;
602 };
603 let Some(st) = per_service.get_mut(config_name) else {
604 return false;
605 };
606 if st.done {
607 return false;
608 }
609 st.cancel_requested = true;
610 st.updated_at_ms = now_ms;
611 true
612 }
613
614 pub async fn is_health_check_cancel_requested(
615 &self,
616 service_name: &str,
617 config_name: &str,
618 ) -> bool {
619 let guard = self.health_checks.read().await;
620 guard
621 .get(service_name)
622 .and_then(|m| m.get(config_name))
623 .is_some_and(|s| s.cancel_requested && !s.done)
624 }
625
626 pub async fn record_health_check_result(
627 &self,
628 service_name: &str,
629 config_name: &str,
630 now_ms: u64,
631 upstream: UpstreamHealth,
632 ) {
633 {
634 let mut guard = self.config_health.write().await;
635 let per_service = guard.entry(service_name.to_string()).or_default();
636 let entry = per_service
637 .entry(config_name.to_string())
638 .or_insert_with(|| ConfigHealth {
639 checked_at_ms: now_ms,
640 upstreams: Vec::new(),
641 });
642 entry.checked_at_ms = entry.checked_at_ms.max(now_ms);
643 entry.upstreams.push(upstream.clone());
644 if entry.upstreams.len() > Self::MAX_HEALTH_RECORDS_PER_CONFIG {
645 let extra = entry
646 .upstreams
647 .len()
648 .saturating_sub(Self::MAX_HEALTH_RECORDS_PER_CONFIG);
649 if extra > 0 {
650 entry.upstreams.drain(0..extra);
651 }
652 }
653 }
654
655 let mut guard = self.health_checks.write().await;
656 let per_service = guard.entry(service_name.to_string()).or_default();
657 let st = per_service.entry(config_name.to_string()).or_default();
658 st.updated_at_ms = now_ms;
659 st.completed = st.completed.saturating_add(1);
660 match upstream.ok {
661 Some(true) => st.ok = st.ok.saturating_add(1),
662 Some(false) => {
663 st.err = st.err.saturating_add(1);
664 if st.last_error.is_none() {
665 st.last_error = upstream.error.clone();
666 }
667 }
668 None => {}
669 }
670 }
671
672 pub async fn finish_health_check(
673 &self,
674 service_name: &str,
675 config_name: &str,
676 now_ms: u64,
677 canceled: bool,
678 ) {
679 let mut guard = self.health_checks.write().await;
680 let per_service = guard.entry(service_name.to_string()).or_default();
681 let st = per_service.entry(config_name.to_string()).or_default();
682 st.updated_at_ms = now_ms;
683 st.canceled = canceled;
684 st.done = true;
685 }
686
687 pub async fn get_usage_rollup_view(
688 &self,
689 service_name: &str,
690 top_n: usize,
691 days: usize,
692 ) -> UsageRollupView {
693 let guard = self.usage_rollups.read().await;
694 let Some(rollup) = guard.get(service_name) else {
695 return UsageRollupView::default();
696 };
697
698 fn day_series(map: &HashMap<i32, UsageBucket>, days: usize) -> Vec<(i32, UsageBucket)> {
699 let mut out = map.iter().map(|(k, v)| (*k, v.clone())).collect::<Vec<_>>();
700 out.sort_by_key(|(k, _)| *k);
701 if out.len() > days {
702 out = out[out.len().saturating_sub(days)..].to_vec();
703 }
704 out
705 }
706
707 let mut by_day = rollup
708 .by_day
709 .iter()
710 .map(|(k, v)| (*k, v.clone()))
711 .collect::<Vec<_>>();
712 by_day.sort_by_key(|(k, _)| *k);
713 if by_day.len() > days {
714 by_day = by_day[by_day.len().saturating_sub(days)..].to_vec();
715 }
716
717 let mut by_config = rollup
718 .by_config
719 .iter()
720 .map(|(k, v)| (k.clone(), v.clone()))
721 .collect::<Vec<_>>();
722 by_config.sort_by_key(|(_, v)| std::cmp::Reverse(v.usage.total_tokens));
723 by_config.truncate(top_n);
724
725 let mut by_provider = rollup
726 .by_provider
727 .iter()
728 .map(|(k, v)| (k.clone(), v.clone()))
729 .collect::<Vec<_>>();
730 by_provider.sort_by_key(|(_, v)| std::cmp::Reverse(v.usage.total_tokens));
731 by_provider.truncate(top_n);
732
733 let mut by_config_day = HashMap::new();
734 for (name, _) in &by_config {
735 if let Some(m) = rollup.by_config_day.get(name) {
736 by_config_day.insert(name.clone(), day_series(m, days));
737 } else {
738 by_config_day.insert(name.clone(), Vec::new());
739 }
740 }
741
742 let mut by_provider_day = HashMap::new();
743 for (name, _) in &by_provider {
744 if let Some(m) = rollup.by_provider_day.get(name) {
745 by_provider_day.insert(name.clone(), day_series(m, days));
746 } else {
747 by_provider_day.insert(name.clone(), Vec::new());
748 }
749 }
750
751 UsageRollupView {
752 since_start: rollup.since_start.clone(),
753 by_day,
754 by_config,
755 by_config_day,
756 by_provider,
757 by_provider_day,
758 }
759 }
760
761 pub async fn replay_usage_from_requests_log(
762 &self,
763 service_name: &str,
764 log_path: PathBuf,
765 base_url_to_provider_id: HashMap<String, String>,
766 ) -> usize {
767 let enabled = std::env::var("CODEX_HELPER_USAGE_REPLAY_ON_STARTUP")
768 .ok()
769 .map(|v| {
770 matches!(
771 v.trim().to_ascii_lowercase().as_str(),
772 "1" | "true" | "yes" | "y" | "on"
773 )
774 })
775 .unwrap_or(true);
776 if !enabled {
777 return 0;
778 }
779
780 let already_has_data = {
781 let guard = self.usage_rollups.read().await;
782 guard
783 .get(service_name)
784 .is_some_and(|r| r.since_start.requests_total > 0)
785 };
786 if already_has_data {
787 return 0;
788 }
789
790 if !log_path.exists() {
791 return 0;
792 }
793
794 let max_bytes = std::env::var("CODEX_HELPER_USAGE_REPLAY_MAX_BYTES")
795 .ok()
796 .and_then(|s| s.trim().parse::<usize>().ok())
797 .filter(|&n| n > 0)
798 .unwrap_or(8 * 1024 * 1024);
799 let max_lines = std::env::var("CODEX_HELPER_USAGE_REPLAY_MAX_LINES")
800 .ok()
801 .and_then(|s| s.trim().parse::<usize>().ok())
802 .filter(|&n| n > 0)
803 .unwrap_or(20_000);
804
805 let mut file = match std::fs::File::open(&log_path) {
806 Ok(f) => f,
807 Err(_) => return 0,
808 };
809 let len: u64 = file.metadata().map(|m| m.len()).unwrap_or_default();
810 let start = len.saturating_sub(max_bytes as u64);
811 if file.seek(SeekFrom::Start(start)).is_err() {
812 return 0;
813 }
814 let mut buf = Vec::new();
815 if file.read_to_end(&mut buf).is_err() {
816 return 0;
817 }
818 if start > 0 {
819 if let Some(pos) = buf.iter().position(|b| *b == b'\n') {
820 buf = buf[pos + 1..].to_vec();
821 } else {
822 return 0;
823 }
824 }
825
826 let text = match std::str::from_utf8(&buf) {
827 Ok(s) => s,
828 Err(_) => return 0,
829 };
830 let lines = text
831 .lines()
832 .map(|l| l.trim())
833 .filter(|l| !l.is_empty())
834 .collect::<Vec<_>>();
835 let start_idx = lines.len().saturating_sub(max_lines);
836
837 let mut events = Vec::new();
838 for line in &lines[start_idx..] {
839 let Ok(v) = serde_json::from_str::<JsonValue>(line) else {
840 continue;
841 };
842 let Some(svc) = v.get("service").and_then(|x| x.as_str()) else {
843 continue;
844 };
845 if svc != service_name {
846 continue;
847 }
848
849 let ended_at_ms = v.get("timestamp_ms").and_then(|x| x.as_u64()).unwrap_or(0);
850 let status_code = v.get("status_code").and_then(|x| x.as_u64()).unwrap_or(0) as u16;
851 let duration_ms = v.get("duration_ms").and_then(|x| x.as_u64()).unwrap_or(0);
852 let config_name = v
853 .get("config_name")
854 .and_then(|x| x.as_str())
855 .unwrap_or("-")
856 .to_string();
857 let upstream_base_url = v
858 .get("upstream_base_url")
859 .and_then(|x| x.as_str())
860 .unwrap_or("-")
861 .to_string();
862 let provider_id = v
863 .get("provider_id")
864 .and_then(|x| x.as_str())
865 .map(|s| s.to_string())
866 .or_else(|| base_url_to_provider_id.get(&upstream_base_url).cloned())
867 .unwrap_or_else(|| "-".to_string());
868 let usage = v
869 .get("usage")
870 .and_then(|u| serde_json::from_value::<UsageMetrics>(u.clone()).ok());
871 let ttfb_ms = v.get("ttfb_ms").and_then(|x| x.as_u64());
872
873 events.push((
874 ended_at_ms,
875 status_code,
876 duration_ms,
877 config_name,
878 provider_id,
879 usage,
880 ttfb_ms,
881 ));
882 }
883
884 if events.is_empty() {
885 return 0;
886 }
887
888 let mut guard = self.usage_rollups.write().await;
889 let rollup = guard.entry(service_name.to_string()).or_default();
890 for (ended_at_ms, status_code, duration_ms, cfg_key, provider_key, usage, ttfb_ms) in
891 events.iter()
892 {
893 let day = (*ended_at_ms / 86_400_000) as i32;
894 rollup
895 .since_start
896 .record(*status_code, *duration_ms, usage.as_ref(), *ttfb_ms);
897 rollup.by_day.entry(day).or_default().record(
898 *status_code,
899 *duration_ms,
900 usage.as_ref(),
901 *ttfb_ms,
902 );
903 rollup.by_config.entry(cfg_key.clone()).or_default().record(
904 *status_code,
905 *duration_ms,
906 usage.as_ref(),
907 *ttfb_ms,
908 );
909 rollup
910 .by_config_day
911 .entry(cfg_key.clone())
912 .or_default()
913 .entry(day)
914 .or_default()
915 .record(*status_code, *duration_ms, usage.as_ref(), *ttfb_ms);
916 rollup
917 .by_provider
918 .entry(provider_key.clone())
919 .or_default()
920 .record(*status_code, *duration_ms, usage.as_ref(), *ttfb_ms);
921 rollup
922 .by_provider_day
923 .entry(provider_key.clone())
924 .or_default()
925 .entry(day)
926 .or_default()
927 .record(*status_code, *duration_ms, usage.as_ref(), *ttfb_ms);
928 }
929
930 events.len()
931 }
932
933 pub async fn resolve_session_cwd(&self, session_id: &str) -> Option<String> {
934 if self.session_cwd_cache_max_entries == 0 {
935 return sessions::find_codex_session_cwd_by_id(session_id)
936 .await
937 .ok()
938 .flatten();
939 }
940
941 let now_ms = std::time::SystemTime::now()
942 .duration_since(std::time::UNIX_EPOCH)
943 .map(|d| d.as_millis() as u64)
944 .unwrap_or(0);
945
946 {
947 let guard = self.session_cwd_cache.read().await;
948 if let Some(v) = guard.get(session_id) {
949 let out = v.cwd.clone();
950 drop(guard);
951 let mut guard = self.session_cwd_cache.write().await;
952 if let Some(v) = guard.get_mut(session_id) {
953 v.last_seen_ms = now_ms;
954 }
955 return out;
956 }
957 }
958
959 let resolved = sessions::find_codex_session_cwd_by_id(session_id)
962 .await
963 .ok()
964 .flatten();
965
966 let mut guard = self.session_cwd_cache.write().await;
967 guard.insert(
968 session_id.to_string(),
969 SessionCwdCacheEntry {
970 cwd: resolved.clone(),
971 last_seen_ms: now_ms,
972 },
973 );
974 resolved
975 }
976
977 #[allow(clippy::too_many_arguments)]
978 pub async fn begin_request(
979 &self,
980 service: &str,
981 method: &str,
982 path: &str,
983 session_id: Option<String>,
984 cwd: Option<String>,
985 model: Option<String>,
986 reasoning_effort: Option<String>,
987 started_at_ms: u64,
988 ) -> u64 {
989 let id = self.next_request_id.fetch_add(1, Ordering::Relaxed);
990 let req = ActiveRequest {
991 id,
992 session_id,
993 cwd,
994 model,
995 reasoning_effort,
996 config_name: None,
997 provider_id: None,
998 upstream_base_url: None,
999 service: service.to_string(),
1000 method: method.to_string(),
1001 path: path.to_string(),
1002 started_at_ms,
1003 };
1004 let mut guard = self.active_requests.write().await;
1005 guard.insert(id, req);
1006 id
1007 }
1008
1009 pub async fn update_request_route(
1010 &self,
1011 request_id: u64,
1012 config_name: String,
1013 provider_id: Option<String>,
1014 upstream_base_url: String,
1015 ) {
1016 let mut guard = self.active_requests.write().await;
1017 let Some(req) = guard.get_mut(&request_id) else {
1018 return;
1019 };
1020 req.config_name = Some(config_name);
1021 req.provider_id = provider_id;
1022 req.upstream_base_url = Some(upstream_base_url);
1023 }
1024
1025 pub async fn finish_request(&self, params: FinishRequestParams) {
1026 let mut active = self.active_requests.write().await;
1027 let Some(req) = active.remove(¶ms.id) else {
1028 return;
1029 };
1030
1031 let finished = FinishedRequest {
1032 id: params.id,
1033 session_id: req.session_id,
1034 cwd: req.cwd,
1035 model: req.model,
1036 reasoning_effort: req.reasoning_effort,
1037 config_name: req.config_name,
1038 provider_id: req.provider_id,
1039 upstream_base_url: req.upstream_base_url,
1040 usage: params.usage.clone(),
1041 retry: params.retry,
1042 service: req.service,
1043 method: req.method,
1044 path: req.path,
1045 status_code: params.status_code,
1046 duration_ms: params.duration_ms,
1047 ttfb_ms: params.ttfb_ms,
1048 ended_at_ms: params.ended_at_ms,
1049 };
1050
1051 {
1052 let day = (finished.ended_at_ms / 86_400_000) as i32;
1053 let cfg_key = finished
1054 .config_name
1055 .clone()
1056 .unwrap_or_else(|| "-".to_string());
1057 let provider_key = finished
1058 .provider_id
1059 .clone()
1060 .unwrap_or_else(|| "-".to_string());
1061
1062 let mut rollups = self.usage_rollups.write().await;
1063 let rollup = rollups.entry(finished.service.clone()).or_default();
1064 rollup.since_start.record(
1065 finished.status_code,
1066 finished.duration_ms,
1067 finished.usage.as_ref(),
1068 finished.ttfb_ms,
1069 );
1070 rollup.by_day.entry(day).or_default().record(
1071 finished.status_code,
1072 finished.duration_ms,
1073 finished.usage.as_ref(),
1074 finished.ttfb_ms,
1075 );
1076 rollup.by_config.entry(cfg_key.clone()).or_default().record(
1077 finished.status_code,
1078 finished.duration_ms,
1079 finished.usage.as_ref(),
1080 finished.ttfb_ms,
1081 );
1082 rollup
1083 .by_config_day
1084 .entry(cfg_key)
1085 .or_default()
1086 .entry(day)
1087 .or_default()
1088 .record(
1089 finished.status_code,
1090 finished.duration_ms,
1091 finished.usage.as_ref(),
1092 finished.ttfb_ms,
1093 );
1094
1095 rollup
1096 .by_provider
1097 .entry(provider_key.clone())
1098 .or_default()
1099 .record(
1100 finished.status_code,
1101 finished.duration_ms,
1102 finished.usage.as_ref(),
1103 finished.ttfb_ms,
1104 );
1105 rollup
1106 .by_provider_day
1107 .entry(provider_key)
1108 .or_default()
1109 .entry(day)
1110 .or_default()
1111 .record(
1112 finished.status_code,
1113 finished.duration_ms,
1114 finished.usage.as_ref(),
1115 finished.ttfb_ms,
1116 );
1117 }
1118
1119 if let Some(sid) = finished.session_id.as_deref() {
1120 let mut stats = self.session_stats.write().await;
1121 let entry = stats.entry(sid.to_string()).or_default();
1122 entry.turns_total = entry.turns_total.saturating_add(1);
1123 entry.last_model = finished.model.clone().or(entry.last_model.clone());
1124 entry.last_reasoning_effort = finished
1125 .reasoning_effort
1126 .clone()
1127 .or(entry.last_reasoning_effort.clone());
1128 entry.last_provider_id = finished
1129 .provider_id
1130 .clone()
1131 .or(entry.last_provider_id.clone());
1132 entry.last_config_name = finished
1133 .config_name
1134 .clone()
1135 .or(entry.last_config_name.clone());
1136 if let Some(u) = finished.usage.as_ref() {
1137 entry.last_usage = Some(u.clone());
1138 entry.total_usage.add_assign(u);
1139 entry.turns_with_usage = entry.turns_with_usage.saturating_add(1);
1140 }
1141 entry.last_status = Some(finished.status_code);
1142 entry.last_duration_ms = Some(finished.duration_ms);
1143 entry.last_ended_at_ms = Some(finished.ended_at_ms);
1144 entry.last_seen_ms = finished.ended_at_ms;
1145 }
1146
1147 let mut recent = self.recent_finished.write().await;
1148 recent.push_front(finished);
1149 while recent.len() > recent_finished_max() {
1150 recent.pop_back();
1151 }
1152 }
1153
1154 pub async fn list_active_requests(&self) -> Vec<ActiveRequest> {
1155 let guard = self.active_requests.read().await;
1156 let mut vec = guard.values().cloned().collect::<Vec<_>>();
1157 vec.sort_by_key(|r| r.started_at_ms);
1158 vec
1159 }
1160
1161 pub async fn list_recent_finished(&self, limit: usize) -> Vec<FinishedRequest> {
1162 let guard = self.recent_finished.read().await;
1163 guard.iter().take(limit).cloned().collect()
1164 }
1165
1166 pub async fn list_session_stats(&self) -> HashMap<String, SessionStats> {
1167 let guard = self.session_stats.read().await;
1168 guard.clone()
1169 }
1170
1171 pub fn spawn_cleanup_task(state: Arc<Self>) {
1172 tokio::spawn(async move {
1174 let mut tick = interval(Duration::from_secs(30));
1175 loop {
1176 tick.tick().await;
1177 state.prune_periodic().await;
1178 }
1179 });
1180 }
1181
1182 async fn prune_periodic(&self) {
1183 let now_ms = std::time::SystemTime::now()
1184 .duration_since(std::time::UNIX_EPOCH)
1185 .map(|d| d.as_millis() as u64)
1186 .unwrap_or(0);
1187
1188 let active = self.active_requests.read().await;
1190 let mut active_sessions: HashMap<String, ()> = HashMap::new();
1191 for req in active.values() {
1192 if let Some(sid) = req.session_id.as_deref() {
1193 active_sessions.insert(sid.to_string(), ());
1194 }
1195 }
1196
1197 if self.session_override_ttl_ms > 0 && now_ms >= self.session_override_ttl_ms {
1198 let cutoff_override = now_ms - self.session_override_ttl_ms;
1199 let mut overrides = self.session_effort_overrides.write().await;
1200 overrides.retain(|sid, v| {
1201 if active_sessions.contains_key(sid) {
1202 return true;
1203 }
1204 v.last_seen_ms >= cutoff_override
1205 });
1206 }
1207
1208 if self.session_override_ttl_ms > 0 && now_ms >= self.session_override_ttl_ms {
1209 let cutoff_override = now_ms - self.session_override_ttl_ms;
1210 let mut overrides = self.session_config_overrides.write().await;
1211 overrides.retain(|sid, v| {
1212 if active_sessions.contains_key(sid) {
1213 return true;
1214 }
1215 v.last_seen_ms >= cutoff_override
1216 });
1217 }
1218
1219 let keep_days: i32 = std::env::var("CODEX_HELPER_USAGE_ROLLUP_KEEP_DAYS")
1221 .ok()
1222 .and_then(|s| s.trim().parse::<i32>().ok())
1223 .filter(|&n| n > 0)
1224 .unwrap_or(60);
1225 let now_day = (now_ms / 86_400_000) as i32;
1226 let cutoff_day = now_day.saturating_sub(keep_days);
1227 let mut rollups = self.usage_rollups.write().await;
1228 for rollup in rollups.values_mut() {
1229 rollup.by_day.retain(|day, _| *day >= cutoff_day);
1230 rollup.by_config_day.retain(|_, m| {
1231 m.retain(|day, _| *day >= cutoff_day);
1232 !m.is_empty()
1233 });
1234 rollup.by_provider_day.retain(|_, m| {
1235 m.retain(|day, _| *day >= cutoff_day);
1236 !m.is_empty()
1237 });
1238 }
1239
1240 let cutoff_cwd =
1241 if self.session_cwd_cache_ttl_ms == 0 || now_ms < self.session_cwd_cache_ttl_ms {
1242 0
1243 } else {
1244 now_ms - self.session_cwd_cache_ttl_ms
1245 };
1246 self.prune_session_cwd_cache(&active_sessions, cutoff_cwd)
1247 .await;
1248
1249 if self.session_override_ttl_ms > 0 && now_ms >= self.session_override_ttl_ms {
1250 let cutoff_stats = now_ms - self.session_override_ttl_ms;
1251 let mut stats = self.session_stats.write().await;
1252 stats.retain(|sid, v| {
1253 active_sessions.contains_key(sid) || v.last_seen_ms >= cutoff_stats
1254 });
1255 }
1256 }
1257
1258 async fn prune_session_cwd_cache(&self, active_sessions: &HashMap<String, ()>, cutoff: u64) {
1259 if self.session_cwd_cache_max_entries == 0 {
1260 return;
1261 }
1262 let mut cache = self.session_cwd_cache.write().await;
1263
1264 if self.session_cwd_cache_ttl_ms > 0 {
1265 cache.retain(|sid, v| {
1266 if active_sessions.contains_key(sid) {
1267 return true;
1268 }
1269 v.last_seen_ms >= cutoff
1270 });
1271 }
1272
1273 let max = self.session_cwd_cache_max_entries;
1274 if max == 0 || cache.len() <= max {
1275 return;
1276 }
1277
1278 let mut keys = cache
1280 .iter()
1281 .map(|(sid, v)| (sid.clone(), v.last_seen_ms))
1282 .collect::<Vec<_>>();
1283 keys.sort_by_key(|(_, t)| *t);
1284 let remove_count = keys.len().saturating_sub(max);
1285 for (sid, _) in keys.into_iter().take(remove_count) {
1286 cache.remove(&sid);
1287 }
1288 }
1289}