Skip to main content

codex_helper_core/
state.rs

1use std::collections::{HashMap, VecDeque};
2use std::io::{Read, Seek, SeekFrom};
3use std::path::PathBuf;
4use std::sync::Arc;
5use std::sync::Mutex;
6use std::sync::OnceLock;
7use std::sync::atomic::{AtomicU64, Ordering};
8
9use serde::{Deserialize, Serialize};
10use serde_json::Value as JsonValue;
11use tokio::sync::RwLock;
12use tokio::time::{Duration, interval};
13
14use crate::lb::LbState;
15use crate::logging::RetryInfo;
16use crate::sessions;
17use crate::usage::UsageMetrics;
18
19fn recent_finished_max() -> usize {
20    static MAX: OnceLock<usize> = OnceLock::new();
21    *MAX.get_or_init(|| {
22        std::env::var("CODEX_HELPER_RECENT_FINISHED_MAX")
23            .ok()
24            .and_then(|s| s.trim().parse::<usize>().ok())
25            .filter(|&n| n > 0)
26            .unwrap_or(2_000)
27            .clamp(200, 20_000)
28    })
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
32pub struct UsageBucket {
33    pub requests_total: u64,
34    pub requests_error: u64,
35    pub duration_ms_total: u64,
36    pub requests_with_usage: u64,
37    pub duration_ms_with_usage_total: u64,
38    pub generation_ms_total: u64,
39    pub ttfb_ms_total: u64,
40    pub ttfb_samples: u64,
41    pub usage: UsageMetrics,
42}
43
44impl UsageBucket {
45    fn record(
46        &mut self,
47        status_code: u16,
48        duration_ms: u64,
49        usage: Option<&UsageMetrics>,
50        ttfb_ms: Option<u64>,
51    ) {
52        self.requests_total = self.requests_total.saturating_add(1);
53        if status_code >= 400 {
54            self.requests_error = self.requests_error.saturating_add(1);
55        }
56        self.duration_ms_total = self.duration_ms_total.saturating_add(duration_ms);
57        if let Some(u) = usage {
58            self.usage.add_assign(u);
59            self.requests_with_usage = self.requests_with_usage.saturating_add(1);
60            self.duration_ms_with_usage_total = self
61                .duration_ms_with_usage_total
62                .saturating_add(duration_ms);
63
64            let gen_ms = match ttfb_ms {
65                Some(ttfb) if ttfb > 0 && ttfb < duration_ms => duration_ms.saturating_sub(ttfb),
66                _ => duration_ms,
67            };
68            self.generation_ms_total = self.generation_ms_total.saturating_add(gen_ms);
69            if let Some(ttfb) = ttfb_ms.filter(|v| *v > 0) {
70                self.ttfb_ms_total = self.ttfb_ms_total.saturating_add(ttfb);
71                self.ttfb_samples = self.ttfb_samples.saturating_add(1);
72            }
73        }
74    }
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
78pub struct UsageRollupView {
79    pub since_start: UsageBucket,
80    pub by_day: Vec<(i32, UsageBucket)>,
81    pub by_config: Vec<(String, UsageBucket)>,
82    pub by_config_day: HashMap<String, Vec<(i32, UsageBucket)>>,
83    pub by_provider: Vec<(String, UsageBucket)>,
84    pub by_provider_day: HashMap<String, Vec<(i32, UsageBucket)>>,
85}
86
87#[derive(Debug, Clone, Default)]
88struct UsageRollup {
89    since_start: UsageBucket,
90    by_day: HashMap<i32, UsageBucket>,
91    by_config: HashMap<String, UsageBucket>,
92    by_config_day: HashMap<String, HashMap<i32, UsageBucket>>,
93    by_provider: HashMap<String, UsageBucket>,
94    by_provider_day: HashMap<String, HashMap<i32, UsageBucket>>,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
98pub struct UpstreamHealth {
99    pub base_url: String,
100    #[serde(skip_serializing_if = "Option::is_none")]
101    pub ok: Option<bool>,
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub status_code: Option<u16>,
104    #[serde(skip_serializing_if = "Option::is_none")]
105    pub latency_ms: Option<u64>,
106    #[serde(skip_serializing_if = "Option::is_none")]
107    pub error: Option<String>,
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
111pub struct ConfigHealth {
112    pub checked_at_ms: u64,
113    #[serde(default)]
114    pub upstreams: Vec<UpstreamHealth>,
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
118pub struct LbUpstreamView {
119    pub failure_count: u32,
120    pub cooldown_remaining_secs: Option<u64>,
121    pub usage_exhausted: bool,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
125pub struct LbConfigView {
126    pub last_good_index: Option<usize>,
127    pub upstreams: Vec<LbUpstreamView>,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
131pub struct HealthCheckStatus {
132    pub started_at_ms: u64,
133    pub updated_at_ms: u64,
134    pub total: u32,
135    pub completed: u32,
136    pub ok: u32,
137    pub err: u32,
138    pub cancel_requested: bool,
139    pub canceled: bool,
140    pub done: bool,
141    #[serde(skip_serializing_if = "Option::is_none")]
142    pub last_error: Option<String>,
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
146pub struct ActiveRequest {
147    pub id: u64,
148    #[serde(skip_serializing_if = "Option::is_none")]
149    pub session_id: Option<String>,
150    #[serde(skip_serializing_if = "Option::is_none")]
151    pub cwd: Option<String>,
152    #[serde(skip_serializing_if = "Option::is_none")]
153    pub model: Option<String>,
154    #[serde(skip_serializing_if = "Option::is_none")]
155    pub reasoning_effort: Option<String>,
156    #[serde(skip_serializing_if = "Option::is_none")]
157    pub config_name: Option<String>,
158    #[serde(skip_serializing_if = "Option::is_none")]
159    pub provider_id: Option<String>,
160    #[serde(skip_serializing_if = "Option::is_none")]
161    pub upstream_base_url: Option<String>,
162    pub service: String,
163    pub method: String,
164    pub path: String,
165    pub started_at_ms: u64,
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
169pub struct FinishedRequest {
170    pub id: u64,
171    #[serde(skip_serializing_if = "Option::is_none")]
172    pub session_id: Option<String>,
173    #[serde(skip_serializing_if = "Option::is_none")]
174    pub cwd: Option<String>,
175    #[serde(skip_serializing_if = "Option::is_none")]
176    pub model: Option<String>,
177    #[serde(skip_serializing_if = "Option::is_none")]
178    pub reasoning_effort: Option<String>,
179    #[serde(skip_serializing_if = "Option::is_none")]
180    pub config_name: Option<String>,
181    #[serde(skip_serializing_if = "Option::is_none")]
182    pub provider_id: Option<String>,
183    #[serde(skip_serializing_if = "Option::is_none")]
184    pub upstream_base_url: Option<String>,
185    #[serde(skip_serializing_if = "Option::is_none")]
186    pub usage: Option<UsageMetrics>,
187    #[serde(skip_serializing_if = "Option::is_none")]
188    pub retry: Option<RetryInfo>,
189    pub service: String,
190    pub method: String,
191    pub path: String,
192    pub status_code: u16,
193    pub duration_ms: u64,
194    #[serde(skip_serializing_if = "Option::is_none")]
195    pub ttfb_ms: Option<u64>,
196    pub ended_at_ms: u64,
197}
198
199#[derive(Debug, Clone)]
200pub struct FinishRequestParams {
201    pub id: u64,
202    pub status_code: u16,
203    pub duration_ms: u64,
204    pub ended_at_ms: u64,
205    pub usage: Option<UsageMetrics>,
206    pub retry: Option<RetryInfo>,
207    pub ttfb_ms: Option<u64>,
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
211pub struct SessionStats {
212    pub turns_total: u64,
213    #[serde(skip_serializing_if = "Option::is_none")]
214    pub last_model: Option<String>,
215    #[serde(skip_serializing_if = "Option::is_none")]
216    pub last_reasoning_effort: Option<String>,
217    #[serde(skip_serializing_if = "Option::is_none")]
218    pub last_provider_id: Option<String>,
219    #[serde(skip_serializing_if = "Option::is_none")]
220    pub last_config_name: Option<String>,
221    #[serde(skip_serializing_if = "Option::is_none")]
222    pub last_usage: Option<UsageMetrics>,
223    pub total_usage: UsageMetrics,
224    pub turns_with_usage: u64,
225    #[serde(skip_serializing_if = "Option::is_none")]
226    pub last_status: Option<u16>,
227    #[serde(skip_serializing_if = "Option::is_none")]
228    pub last_duration_ms: Option<u64>,
229    #[serde(skip_serializing_if = "Option::is_none")]
230    pub last_ended_at_ms: Option<u64>,
231    pub last_seen_ms: u64,
232}
233
234#[derive(Debug, Clone)]
235struct SessionEffortOverride {
236    effort: String,
237    #[allow(dead_code)]
238    updated_at_ms: u64,
239    last_seen_ms: u64,
240}
241
242#[derive(Debug, Clone)]
243struct SessionConfigOverride {
244    config_name: String,
245    #[allow(dead_code)]
246    updated_at_ms: u64,
247    last_seen_ms: u64,
248}
249
250#[derive(Debug, Clone)]
251struct SessionCwdCacheEntry {
252    cwd: Option<String>,
253    last_seen_ms: u64,
254}
255
256#[derive(Debug, Clone, Default)]
257struct ConfigMetaOverride {
258    enabled: Option<bool>,
259    level: Option<u8>,
260    #[allow(dead_code)]
261    updated_at_ms: u64,
262}
263
264/// Runtime-only state for the proxy process.
265///
266/// This state is intentionally not persisted across restarts.
267#[derive(Debug)]
268pub struct ProxyState {
269    next_request_id: AtomicU64,
270    session_override_ttl_ms: u64,
271    session_cwd_cache_ttl_ms: u64,
272    session_cwd_cache_max_entries: usize,
273    session_effort_overrides: RwLock<HashMap<String, SessionEffortOverride>>,
274    session_config_overrides: RwLock<HashMap<String, SessionConfigOverride>>,
275    global_config_override: RwLock<Option<String>>,
276    config_meta_overrides: RwLock<HashMap<String, HashMap<String, ConfigMetaOverride>>>,
277    session_cwd_cache: RwLock<HashMap<String, SessionCwdCacheEntry>>,
278    session_stats: RwLock<HashMap<String, SessionStats>>,
279    active_requests: RwLock<HashMap<u64, ActiveRequest>>,
280    recent_finished: RwLock<VecDeque<FinishedRequest>>,
281    usage_rollups: RwLock<HashMap<String, UsageRollup>>,
282    config_health: RwLock<HashMap<String, HashMap<String, ConfigHealth>>>,
283    health_checks: RwLock<HashMap<String, HashMap<String, HealthCheckStatus>>>,
284    lb_states: Option<Arc<Mutex<HashMap<String, LbState>>>>,
285}
286
287impl ProxyState {
288    const MAX_HEALTH_RECORDS_PER_CONFIG: usize = 200;
289
290    #[allow(dead_code)]
291    pub fn new() -> Arc<Self> {
292        Self::new_with_lb_states(None)
293    }
294
295    pub fn new_with_lb_states(
296        lb_states: Option<Arc<Mutex<HashMap<String, LbState>>>>,
297    ) -> Arc<Self> {
298        let ttl_secs = std::env::var("CODEX_HELPER_SESSION_OVERRIDE_TTL_SECS")
299            .ok()
300            .and_then(|s| s.trim().parse::<u64>().ok())
301            .filter(|&n| n > 0)
302            .unwrap_or(30 * 60);
303        let ttl_ms = ttl_secs.saturating_mul(1000);
304
305        let cwd_cache_ttl_secs = std::env::var("CODEX_HELPER_SESSION_CWD_CACHE_TTL_SECS")
306            .ok()
307            .and_then(|s| s.trim().parse::<u64>().ok())
308            .unwrap_or(12 * 60 * 60);
309        let cwd_cache_ttl_ms = cwd_cache_ttl_secs.saturating_mul(1000);
310        let cwd_cache_max_entries = std::env::var("CODEX_HELPER_SESSION_CWD_CACHE_MAX_ENTRIES")
311            .ok()
312            .and_then(|s| s.trim().parse::<usize>().ok())
313            .unwrap_or(2_000);
314
315        Arc::new(Self {
316            next_request_id: AtomicU64::new(1),
317            session_override_ttl_ms: ttl_ms,
318            session_cwd_cache_ttl_ms: cwd_cache_ttl_ms,
319            session_cwd_cache_max_entries: cwd_cache_max_entries,
320            session_effort_overrides: RwLock::new(HashMap::new()),
321            session_config_overrides: RwLock::new(HashMap::new()),
322            global_config_override: RwLock::new(None),
323            config_meta_overrides: RwLock::new(HashMap::new()),
324            session_cwd_cache: RwLock::new(HashMap::new()),
325            session_stats: RwLock::new(HashMap::new()),
326            active_requests: RwLock::new(HashMap::new()),
327            recent_finished: RwLock::new(VecDeque::new()),
328            usage_rollups: RwLock::new(HashMap::new()),
329            config_health: RwLock::new(HashMap::new()),
330            health_checks: RwLock::new(HashMap::new()),
331            lb_states,
332        })
333    }
334
335    pub async fn get_session_effort_override(&self, session_id: &str) -> Option<String> {
336        let guard = self.session_effort_overrides.read().await;
337        guard.get(session_id).map(|v| v.effort.clone())
338    }
339
340    pub async fn set_session_effort_override(
341        &self,
342        session_id: String,
343        effort: String,
344        now_ms: u64,
345    ) {
346        let mut guard = self.session_effort_overrides.write().await;
347        guard.insert(
348            session_id,
349            SessionEffortOverride {
350                effort,
351                updated_at_ms: now_ms,
352                last_seen_ms: now_ms,
353            },
354        );
355    }
356
357    pub async fn clear_session_effort_override(&self, session_id: &str) {
358        let mut guard = self.session_effort_overrides.write().await;
359        guard.remove(session_id);
360    }
361
362    pub async fn list_session_effort_overrides(&self) -> HashMap<String, String> {
363        let guard = self.session_effort_overrides.read().await;
364        guard
365            .iter()
366            .map(|(k, v)| (k.clone(), v.effort.clone()))
367            .collect()
368    }
369
370    pub async fn touch_session_override(&self, session_id: &str, now_ms: u64) {
371        let mut guard = self.session_effort_overrides.write().await;
372        if let Some(v) = guard.get_mut(session_id) {
373            v.last_seen_ms = now_ms;
374        }
375    }
376
377    pub async fn get_session_config_override(&self, session_id: &str) -> Option<String> {
378        let guard = self.session_config_overrides.read().await;
379        guard.get(session_id).map(|v| v.config_name.clone())
380    }
381
382    pub async fn set_session_config_override(
383        &self,
384        session_id: String,
385        config_name: String,
386        now_ms: u64,
387    ) {
388        let mut guard = self.session_config_overrides.write().await;
389        guard.insert(
390            session_id,
391            SessionConfigOverride {
392                config_name,
393                updated_at_ms: now_ms,
394                last_seen_ms: now_ms,
395            },
396        );
397    }
398
399    pub async fn clear_session_config_override(&self, session_id: &str) {
400        let mut guard = self.session_config_overrides.write().await;
401        guard.remove(session_id);
402    }
403
404    pub async fn list_session_config_overrides(&self) -> HashMap<String, String> {
405        let guard = self.session_config_overrides.read().await;
406        guard
407            .iter()
408            .map(|(k, v)| (k.clone(), v.config_name.clone()))
409            .collect()
410    }
411
412    pub async fn touch_session_config_override(&self, session_id: &str, now_ms: u64) {
413        let mut guard = self.session_config_overrides.write().await;
414        if let Some(v) = guard.get_mut(session_id) {
415            v.last_seen_ms = now_ms;
416        }
417    }
418
419    pub async fn get_global_config_override(&self) -> Option<String> {
420        let guard = self.global_config_override.read().await;
421        guard.clone()
422    }
423
424    pub async fn set_global_config_override(&self, config_name: String, _now_ms: u64) {
425        let mut guard = self.global_config_override.write().await;
426        *guard = Some(config_name);
427    }
428
429    pub async fn clear_global_config_override(&self) {
430        let mut guard = self.global_config_override.write().await;
431        *guard = None;
432    }
433
434    pub async fn set_config_enabled_override(
435        &self,
436        service_name: &str,
437        config_name: String,
438        enabled: bool,
439        now_ms: u64,
440    ) {
441        let mut guard = self.config_meta_overrides.write().await;
442        let per_service = guard.entry(service_name.to_string()).or_default();
443        let entry = per_service.entry(config_name).or_default();
444        entry.enabled = Some(enabled);
445        entry.updated_at_ms = now_ms;
446    }
447
448    pub async fn set_config_level_override(
449        &self,
450        service_name: &str,
451        config_name: String,
452        level: u8,
453        now_ms: u64,
454    ) {
455        let mut guard = self.config_meta_overrides.write().await;
456        let per_service = guard.entry(service_name.to_string()).or_default();
457        let entry = per_service.entry(config_name).or_default();
458        entry.level = Some(level.clamp(1, 10));
459        entry.updated_at_ms = now_ms;
460    }
461
462    pub async fn get_config_meta_overrides(
463        &self,
464        service_name: &str,
465    ) -> HashMap<String, (Option<bool>, Option<u8>)> {
466        let guard = self.config_meta_overrides.read().await;
467        guard
468            .get(service_name)
469            .map(|m| {
470                m.iter()
471                    .map(|(k, v)| (k.clone(), (v.enabled, v.level)))
472                    .collect::<HashMap<_, _>>()
473            })
474            .unwrap_or_default()
475    }
476
477    pub async fn record_config_health(
478        &self,
479        service_name: &str,
480        config_name: String,
481        health: ConfigHealth,
482    ) {
483        let mut guard = self.config_health.write().await;
484        let per_service = guard.entry(service_name.to_string()).or_default();
485        per_service.insert(config_name, health);
486    }
487
488    pub async fn get_config_health(&self, service_name: &str) -> HashMap<String, ConfigHealth> {
489        let guard = self.config_health.read().await;
490        guard.get(service_name).cloned().unwrap_or_default()
491    }
492
493    pub async fn get_lb_view(&self) -> HashMap<String, LbConfigView> {
494        let Some(lb_states) = self.lb_states.as_ref() else {
495            return HashMap::new();
496        };
497        let mut map = match lb_states.lock() {
498            Ok(m) => m,
499            Err(e) => e.into_inner(),
500        };
501
502        let now = std::time::Instant::now();
503        let mut out = HashMap::new();
504        for (cfg_name, st) in map.iter_mut() {
505            let len = st
506                .failure_counts
507                .len()
508                .max(st.cooldown_until.len())
509                .max(st.usage_exhausted.len());
510            if len == 0 {
511                continue;
512            }
513
514            // 如果结构变化导致长度不一致,做一次对齐,避免 UI 读到越界/脏数据。
515            if st.failure_counts.len() != len {
516                st.failure_counts.resize(len, 0);
517            }
518            if st.cooldown_until.len() != len {
519                st.cooldown_until.resize(len, None);
520            }
521            if st.usage_exhausted.len() != len {
522                st.usage_exhausted.resize(len, false);
523            }
524
525            let mut upstreams = Vec::with_capacity(len);
526            for idx in 0..len {
527                let failure_count = st.failure_counts.get(idx).copied().unwrap_or(0);
528                let cooldown_remaining_secs = st
529                    .cooldown_until
530                    .get(idx)
531                    .and_then(|v| *v)
532                    .map(|until| until.saturating_duration_since(now).as_secs())
533                    .filter(|&s| s > 0);
534                let usage_exhausted = st.usage_exhausted.get(idx).copied().unwrap_or(false);
535                upstreams.push(LbUpstreamView {
536                    failure_count,
537                    cooldown_remaining_secs,
538                    usage_exhausted,
539                });
540            }
541
542            out.insert(
543                cfg_name.clone(),
544                LbConfigView {
545                    last_good_index: st.last_good_index,
546                    upstreams,
547                },
548            );
549        }
550        out
551    }
552
553    pub async fn list_health_checks(
554        &self,
555        service_name: &str,
556    ) -> HashMap<String, HealthCheckStatus> {
557        let guard = self.health_checks.read().await;
558        guard.get(service_name).cloned().unwrap_or_default()
559    }
560
561    pub async fn try_begin_health_check(
562        &self,
563        service_name: &str,
564        config_name: &str,
565        total: usize,
566        now_ms: u64,
567    ) -> bool {
568        let mut guard = self.health_checks.write().await;
569        let per_service = guard.entry(service_name.to_string()).or_default();
570        if let Some(existing) = per_service.get(config_name)
571            && !existing.done
572        {
573            return false;
574        }
575        per_service.insert(
576            config_name.to_string(),
577            HealthCheckStatus {
578                started_at_ms: now_ms,
579                updated_at_ms: now_ms,
580                total: total.min(u32::MAX as usize) as u32,
581                completed: 0,
582                ok: 0,
583                err: 0,
584                cancel_requested: false,
585                canceled: false,
586                done: false,
587                last_error: None,
588            },
589        );
590        true
591    }
592
593    pub async fn request_cancel_health_check(
594        &self,
595        service_name: &str,
596        config_name: &str,
597        now_ms: u64,
598    ) -> bool {
599        let mut guard = self.health_checks.write().await;
600        let Some(per_service) = guard.get_mut(service_name) else {
601            return false;
602        };
603        let Some(st) = per_service.get_mut(config_name) else {
604            return false;
605        };
606        if st.done {
607            return false;
608        }
609        st.cancel_requested = true;
610        st.updated_at_ms = now_ms;
611        true
612    }
613
614    pub async fn is_health_check_cancel_requested(
615        &self,
616        service_name: &str,
617        config_name: &str,
618    ) -> bool {
619        let guard = self.health_checks.read().await;
620        guard
621            .get(service_name)
622            .and_then(|m| m.get(config_name))
623            .is_some_and(|s| s.cancel_requested && !s.done)
624    }
625
626    pub async fn record_health_check_result(
627        &self,
628        service_name: &str,
629        config_name: &str,
630        now_ms: u64,
631        upstream: UpstreamHealth,
632    ) {
633        {
634            let mut guard = self.config_health.write().await;
635            let per_service = guard.entry(service_name.to_string()).or_default();
636            let entry = per_service
637                .entry(config_name.to_string())
638                .or_insert_with(|| ConfigHealth {
639                    checked_at_ms: now_ms,
640                    upstreams: Vec::new(),
641                });
642            entry.checked_at_ms = entry.checked_at_ms.max(now_ms);
643            entry.upstreams.push(upstream.clone());
644            if entry.upstreams.len() > Self::MAX_HEALTH_RECORDS_PER_CONFIG {
645                let extra = entry
646                    .upstreams
647                    .len()
648                    .saturating_sub(Self::MAX_HEALTH_RECORDS_PER_CONFIG);
649                if extra > 0 {
650                    entry.upstreams.drain(0..extra);
651                }
652            }
653        }
654
655        let mut guard = self.health_checks.write().await;
656        let per_service = guard.entry(service_name.to_string()).or_default();
657        let st = per_service.entry(config_name.to_string()).or_default();
658        st.updated_at_ms = now_ms;
659        st.completed = st.completed.saturating_add(1);
660        match upstream.ok {
661            Some(true) => st.ok = st.ok.saturating_add(1),
662            Some(false) => {
663                st.err = st.err.saturating_add(1);
664                if st.last_error.is_none() {
665                    st.last_error = upstream.error.clone();
666                }
667            }
668            None => {}
669        }
670    }
671
672    pub async fn finish_health_check(
673        &self,
674        service_name: &str,
675        config_name: &str,
676        now_ms: u64,
677        canceled: bool,
678    ) {
679        let mut guard = self.health_checks.write().await;
680        let per_service = guard.entry(service_name.to_string()).or_default();
681        let st = per_service.entry(config_name.to_string()).or_default();
682        st.updated_at_ms = now_ms;
683        st.canceled = canceled;
684        st.done = true;
685    }
686
687    pub async fn get_usage_rollup_view(
688        &self,
689        service_name: &str,
690        top_n: usize,
691        days: usize,
692    ) -> UsageRollupView {
693        let guard = self.usage_rollups.read().await;
694        let Some(rollup) = guard.get(service_name) else {
695            return UsageRollupView::default();
696        };
697
698        fn day_series(map: &HashMap<i32, UsageBucket>, days: usize) -> Vec<(i32, UsageBucket)> {
699            let mut out = map.iter().map(|(k, v)| (*k, v.clone())).collect::<Vec<_>>();
700            out.sort_by_key(|(k, _)| *k);
701            if out.len() > days {
702                out = out[out.len().saturating_sub(days)..].to_vec();
703            }
704            out
705        }
706
707        let mut by_day = rollup
708            .by_day
709            .iter()
710            .map(|(k, v)| (*k, v.clone()))
711            .collect::<Vec<_>>();
712        by_day.sort_by_key(|(k, _)| *k);
713        if by_day.len() > days {
714            by_day = by_day[by_day.len().saturating_sub(days)..].to_vec();
715        }
716
717        let mut by_config = rollup
718            .by_config
719            .iter()
720            .map(|(k, v)| (k.clone(), v.clone()))
721            .collect::<Vec<_>>();
722        by_config.sort_by_key(|(_, v)| std::cmp::Reverse(v.usage.total_tokens));
723        by_config.truncate(top_n);
724
725        let mut by_provider = rollup
726            .by_provider
727            .iter()
728            .map(|(k, v)| (k.clone(), v.clone()))
729            .collect::<Vec<_>>();
730        by_provider.sort_by_key(|(_, v)| std::cmp::Reverse(v.usage.total_tokens));
731        by_provider.truncate(top_n);
732
733        let mut by_config_day = HashMap::new();
734        for (name, _) in &by_config {
735            if let Some(m) = rollup.by_config_day.get(name) {
736                by_config_day.insert(name.clone(), day_series(m, days));
737            } else {
738                by_config_day.insert(name.clone(), Vec::new());
739            }
740        }
741
742        let mut by_provider_day = HashMap::new();
743        for (name, _) in &by_provider {
744            if let Some(m) = rollup.by_provider_day.get(name) {
745                by_provider_day.insert(name.clone(), day_series(m, days));
746            } else {
747                by_provider_day.insert(name.clone(), Vec::new());
748            }
749        }
750
751        UsageRollupView {
752            since_start: rollup.since_start.clone(),
753            by_day,
754            by_config,
755            by_config_day,
756            by_provider,
757            by_provider_day,
758        }
759    }
760
761    pub async fn replay_usage_from_requests_log(
762        &self,
763        service_name: &str,
764        log_path: PathBuf,
765        base_url_to_provider_id: HashMap<String, String>,
766    ) -> usize {
767        let enabled = std::env::var("CODEX_HELPER_USAGE_REPLAY_ON_STARTUP")
768            .ok()
769            .map(|v| {
770                matches!(
771                    v.trim().to_ascii_lowercase().as_str(),
772                    "1" | "true" | "yes" | "y" | "on"
773                )
774            })
775            .unwrap_or(true);
776        if !enabled {
777            return 0;
778        }
779
780        let already_has_data = {
781            let guard = self.usage_rollups.read().await;
782            guard
783                .get(service_name)
784                .is_some_and(|r| r.since_start.requests_total > 0)
785        };
786        if already_has_data {
787            return 0;
788        }
789
790        if !log_path.exists() {
791            return 0;
792        }
793
794        let max_bytes = std::env::var("CODEX_HELPER_USAGE_REPLAY_MAX_BYTES")
795            .ok()
796            .and_then(|s| s.trim().parse::<usize>().ok())
797            .filter(|&n| n > 0)
798            .unwrap_or(8 * 1024 * 1024);
799        let max_lines = std::env::var("CODEX_HELPER_USAGE_REPLAY_MAX_LINES")
800            .ok()
801            .and_then(|s| s.trim().parse::<usize>().ok())
802            .filter(|&n| n > 0)
803            .unwrap_or(20_000);
804
805        let mut file = match std::fs::File::open(&log_path) {
806            Ok(f) => f,
807            Err(_) => return 0,
808        };
809        let len: u64 = file.metadata().map(|m| m.len()).unwrap_or_default();
810        let start = len.saturating_sub(max_bytes as u64);
811        if file.seek(SeekFrom::Start(start)).is_err() {
812            return 0;
813        }
814        let mut buf = Vec::new();
815        if file.read_to_end(&mut buf).is_err() {
816            return 0;
817        }
818        if start > 0 {
819            if let Some(pos) = buf.iter().position(|b| *b == b'\n') {
820                buf = buf[pos + 1..].to_vec();
821            } else {
822                return 0;
823            }
824        }
825
826        let text = match std::str::from_utf8(&buf) {
827            Ok(s) => s,
828            Err(_) => return 0,
829        };
830        let lines = text
831            .lines()
832            .map(|l| l.trim())
833            .filter(|l| !l.is_empty())
834            .collect::<Vec<_>>();
835        let start_idx = lines.len().saturating_sub(max_lines);
836
837        let mut events = Vec::new();
838        for line in &lines[start_idx..] {
839            let Ok(v) = serde_json::from_str::<JsonValue>(line) else {
840                continue;
841            };
842            let Some(svc) = v.get("service").and_then(|x| x.as_str()) else {
843                continue;
844            };
845            if svc != service_name {
846                continue;
847            }
848
849            let ended_at_ms = v.get("timestamp_ms").and_then(|x| x.as_u64()).unwrap_or(0);
850            let status_code = v.get("status_code").and_then(|x| x.as_u64()).unwrap_or(0) as u16;
851            let duration_ms = v.get("duration_ms").and_then(|x| x.as_u64()).unwrap_or(0);
852            let config_name = v
853                .get("config_name")
854                .and_then(|x| x.as_str())
855                .unwrap_or("-")
856                .to_string();
857            let upstream_base_url = v
858                .get("upstream_base_url")
859                .and_then(|x| x.as_str())
860                .unwrap_or("-")
861                .to_string();
862            let provider_id = v
863                .get("provider_id")
864                .and_then(|x| x.as_str())
865                .map(|s| s.to_string())
866                .or_else(|| base_url_to_provider_id.get(&upstream_base_url).cloned())
867                .unwrap_or_else(|| "-".to_string());
868            let usage = v
869                .get("usage")
870                .and_then(|u| serde_json::from_value::<UsageMetrics>(u.clone()).ok());
871            let ttfb_ms = v.get("ttfb_ms").and_then(|x| x.as_u64());
872
873            events.push((
874                ended_at_ms,
875                status_code,
876                duration_ms,
877                config_name,
878                provider_id,
879                usage,
880                ttfb_ms,
881            ));
882        }
883
884        if events.is_empty() {
885            return 0;
886        }
887
888        let mut guard = self.usage_rollups.write().await;
889        let rollup = guard.entry(service_name.to_string()).or_default();
890        for (ended_at_ms, status_code, duration_ms, cfg_key, provider_key, usage, ttfb_ms) in
891            events.iter()
892        {
893            let day = (*ended_at_ms / 86_400_000) as i32;
894            rollup
895                .since_start
896                .record(*status_code, *duration_ms, usage.as_ref(), *ttfb_ms);
897            rollup.by_day.entry(day).or_default().record(
898                *status_code,
899                *duration_ms,
900                usage.as_ref(),
901                *ttfb_ms,
902            );
903            rollup.by_config.entry(cfg_key.clone()).or_default().record(
904                *status_code,
905                *duration_ms,
906                usage.as_ref(),
907                *ttfb_ms,
908            );
909            rollup
910                .by_config_day
911                .entry(cfg_key.clone())
912                .or_default()
913                .entry(day)
914                .or_default()
915                .record(*status_code, *duration_ms, usage.as_ref(), *ttfb_ms);
916            rollup
917                .by_provider
918                .entry(provider_key.clone())
919                .or_default()
920                .record(*status_code, *duration_ms, usage.as_ref(), *ttfb_ms);
921            rollup
922                .by_provider_day
923                .entry(provider_key.clone())
924                .or_default()
925                .entry(day)
926                .or_default()
927                .record(*status_code, *duration_ms, usage.as_ref(), *ttfb_ms);
928        }
929
930        events.len()
931    }
932
933    pub async fn resolve_session_cwd(&self, session_id: &str) -> Option<String> {
934        if self.session_cwd_cache_max_entries == 0 {
935            return sessions::find_codex_session_cwd_by_id(session_id)
936                .await
937                .ok()
938                .flatten();
939        }
940
941        let now_ms = std::time::SystemTime::now()
942            .duration_since(std::time::UNIX_EPOCH)
943            .map(|d| d.as_millis() as u64)
944            .unwrap_or(0);
945
946        {
947            let guard = self.session_cwd_cache.read().await;
948            if let Some(v) = guard.get(session_id) {
949                let out = v.cwd.clone();
950                drop(guard);
951                let mut guard = self.session_cwd_cache.write().await;
952                if let Some(v) = guard.get_mut(session_id) {
953                    v.last_seen_ms = now_ms;
954                }
955                return out;
956            }
957        }
958
959        // Cache miss: resolve from disk and record last_seen.
960
961        let resolved = sessions::find_codex_session_cwd_by_id(session_id)
962            .await
963            .ok()
964            .flatten();
965
966        let mut guard = self.session_cwd_cache.write().await;
967        guard.insert(
968            session_id.to_string(),
969            SessionCwdCacheEntry {
970                cwd: resolved.clone(),
971                last_seen_ms: now_ms,
972            },
973        );
974        resolved
975    }
976
977    #[allow(clippy::too_many_arguments)]
978    pub async fn begin_request(
979        &self,
980        service: &str,
981        method: &str,
982        path: &str,
983        session_id: Option<String>,
984        cwd: Option<String>,
985        model: Option<String>,
986        reasoning_effort: Option<String>,
987        started_at_ms: u64,
988    ) -> u64 {
989        let id = self.next_request_id.fetch_add(1, Ordering::Relaxed);
990        let req = ActiveRequest {
991            id,
992            session_id,
993            cwd,
994            model,
995            reasoning_effort,
996            config_name: None,
997            provider_id: None,
998            upstream_base_url: None,
999            service: service.to_string(),
1000            method: method.to_string(),
1001            path: path.to_string(),
1002            started_at_ms,
1003        };
1004        let mut guard = self.active_requests.write().await;
1005        guard.insert(id, req);
1006        id
1007    }
1008
1009    pub async fn update_request_route(
1010        &self,
1011        request_id: u64,
1012        config_name: String,
1013        provider_id: Option<String>,
1014        upstream_base_url: String,
1015    ) {
1016        let mut guard = self.active_requests.write().await;
1017        let Some(req) = guard.get_mut(&request_id) else {
1018            return;
1019        };
1020        req.config_name = Some(config_name);
1021        req.provider_id = provider_id;
1022        req.upstream_base_url = Some(upstream_base_url);
1023    }
1024
1025    pub async fn finish_request(&self, params: FinishRequestParams) {
1026        let mut active = self.active_requests.write().await;
1027        let Some(req) = active.remove(&params.id) else {
1028            return;
1029        };
1030
1031        let finished = FinishedRequest {
1032            id: params.id,
1033            session_id: req.session_id,
1034            cwd: req.cwd,
1035            model: req.model,
1036            reasoning_effort: req.reasoning_effort,
1037            config_name: req.config_name,
1038            provider_id: req.provider_id,
1039            upstream_base_url: req.upstream_base_url,
1040            usage: params.usage.clone(),
1041            retry: params.retry,
1042            service: req.service,
1043            method: req.method,
1044            path: req.path,
1045            status_code: params.status_code,
1046            duration_ms: params.duration_ms,
1047            ttfb_ms: params.ttfb_ms,
1048            ended_at_ms: params.ended_at_ms,
1049        };
1050
1051        {
1052            let day = (finished.ended_at_ms / 86_400_000) as i32;
1053            let cfg_key = finished
1054                .config_name
1055                .clone()
1056                .unwrap_or_else(|| "-".to_string());
1057            let provider_key = finished
1058                .provider_id
1059                .clone()
1060                .unwrap_or_else(|| "-".to_string());
1061
1062            let mut rollups = self.usage_rollups.write().await;
1063            let rollup = rollups.entry(finished.service.clone()).or_default();
1064            rollup.since_start.record(
1065                finished.status_code,
1066                finished.duration_ms,
1067                finished.usage.as_ref(),
1068                finished.ttfb_ms,
1069            );
1070            rollup.by_day.entry(day).or_default().record(
1071                finished.status_code,
1072                finished.duration_ms,
1073                finished.usage.as_ref(),
1074                finished.ttfb_ms,
1075            );
1076            rollup.by_config.entry(cfg_key.clone()).or_default().record(
1077                finished.status_code,
1078                finished.duration_ms,
1079                finished.usage.as_ref(),
1080                finished.ttfb_ms,
1081            );
1082            rollup
1083                .by_config_day
1084                .entry(cfg_key)
1085                .or_default()
1086                .entry(day)
1087                .or_default()
1088                .record(
1089                    finished.status_code,
1090                    finished.duration_ms,
1091                    finished.usage.as_ref(),
1092                    finished.ttfb_ms,
1093                );
1094
1095            rollup
1096                .by_provider
1097                .entry(provider_key.clone())
1098                .or_default()
1099                .record(
1100                    finished.status_code,
1101                    finished.duration_ms,
1102                    finished.usage.as_ref(),
1103                    finished.ttfb_ms,
1104                );
1105            rollup
1106                .by_provider_day
1107                .entry(provider_key)
1108                .or_default()
1109                .entry(day)
1110                .or_default()
1111                .record(
1112                    finished.status_code,
1113                    finished.duration_ms,
1114                    finished.usage.as_ref(),
1115                    finished.ttfb_ms,
1116                );
1117        }
1118
1119        if let Some(sid) = finished.session_id.as_deref() {
1120            let mut stats = self.session_stats.write().await;
1121            let entry = stats.entry(sid.to_string()).or_default();
1122            entry.turns_total = entry.turns_total.saturating_add(1);
1123            entry.last_model = finished.model.clone().or(entry.last_model.clone());
1124            entry.last_reasoning_effort = finished
1125                .reasoning_effort
1126                .clone()
1127                .or(entry.last_reasoning_effort.clone());
1128            entry.last_provider_id = finished
1129                .provider_id
1130                .clone()
1131                .or(entry.last_provider_id.clone());
1132            entry.last_config_name = finished
1133                .config_name
1134                .clone()
1135                .or(entry.last_config_name.clone());
1136            if let Some(u) = finished.usage.as_ref() {
1137                entry.last_usage = Some(u.clone());
1138                entry.total_usage.add_assign(u);
1139                entry.turns_with_usage = entry.turns_with_usage.saturating_add(1);
1140            }
1141            entry.last_status = Some(finished.status_code);
1142            entry.last_duration_ms = Some(finished.duration_ms);
1143            entry.last_ended_at_ms = Some(finished.ended_at_ms);
1144            entry.last_seen_ms = finished.ended_at_ms;
1145        }
1146
1147        let mut recent = self.recent_finished.write().await;
1148        recent.push_front(finished);
1149        while recent.len() > recent_finished_max() {
1150            recent.pop_back();
1151        }
1152    }
1153
1154    pub async fn list_active_requests(&self) -> Vec<ActiveRequest> {
1155        let guard = self.active_requests.read().await;
1156        let mut vec = guard.values().cloned().collect::<Vec<_>>();
1157        vec.sort_by_key(|r| r.started_at_ms);
1158        vec
1159    }
1160
1161    pub async fn list_recent_finished(&self, limit: usize) -> Vec<FinishedRequest> {
1162        let guard = self.recent_finished.read().await;
1163        guard.iter().take(limit).cloned().collect()
1164    }
1165
1166    pub async fn list_session_stats(&self) -> HashMap<String, SessionStats> {
1167        let guard = self.session_stats.read().await;
1168        guard.clone()
1169    }
1170
1171    pub fn spawn_cleanup_task(state: Arc<Self>) {
1172        // Run periodically; no need to be super frequent.
1173        tokio::spawn(async move {
1174            let mut tick = interval(Duration::from_secs(30));
1175            loop {
1176                tick.tick().await;
1177                state.prune_periodic().await;
1178            }
1179        });
1180    }
1181
1182    async fn prune_periodic(&self) {
1183        let now_ms = std::time::SystemTime::now()
1184            .duration_since(std::time::UNIX_EPOCH)
1185            .map(|d| d.as_millis() as u64)
1186            .unwrap_or(0);
1187
1188        // Collect active session_ids to avoid clearing overrides for currently running requests.
1189        let active = self.active_requests.read().await;
1190        let mut active_sessions: HashMap<String, ()> = HashMap::new();
1191        for req in active.values() {
1192            if let Some(sid) = req.session_id.as_deref() {
1193                active_sessions.insert(sid.to_string(), ());
1194            }
1195        }
1196
1197        if self.session_override_ttl_ms > 0 && now_ms >= self.session_override_ttl_ms {
1198            let cutoff_override = now_ms - self.session_override_ttl_ms;
1199            let mut overrides = self.session_effort_overrides.write().await;
1200            overrides.retain(|sid, v| {
1201                if active_sessions.contains_key(sid) {
1202                    return true;
1203                }
1204                v.last_seen_ms >= cutoff_override
1205            });
1206        }
1207
1208        if self.session_override_ttl_ms > 0 && now_ms >= self.session_override_ttl_ms {
1209            let cutoff_override = now_ms - self.session_override_ttl_ms;
1210            let mut overrides = self.session_config_overrides.write().await;
1211            overrides.retain(|sid, v| {
1212                if active_sessions.contains_key(sid) {
1213                    return true;
1214                }
1215                v.last_seen_ms >= cutoff_override
1216            });
1217        }
1218
1219        // Keep a bounded number of days of rollup data to avoid unbounded growth.
1220        let keep_days: i32 = std::env::var("CODEX_HELPER_USAGE_ROLLUP_KEEP_DAYS")
1221            .ok()
1222            .and_then(|s| s.trim().parse::<i32>().ok())
1223            .filter(|&n| n > 0)
1224            .unwrap_or(60);
1225        let now_day = (now_ms / 86_400_000) as i32;
1226        let cutoff_day = now_day.saturating_sub(keep_days);
1227        let mut rollups = self.usage_rollups.write().await;
1228        for rollup in rollups.values_mut() {
1229            rollup.by_day.retain(|day, _| *day >= cutoff_day);
1230            rollup.by_config_day.retain(|_, m| {
1231                m.retain(|day, _| *day >= cutoff_day);
1232                !m.is_empty()
1233            });
1234            rollup.by_provider_day.retain(|_, m| {
1235                m.retain(|day, _| *day >= cutoff_day);
1236                !m.is_empty()
1237            });
1238        }
1239
1240        let cutoff_cwd =
1241            if self.session_cwd_cache_ttl_ms == 0 || now_ms < self.session_cwd_cache_ttl_ms {
1242                0
1243            } else {
1244                now_ms - self.session_cwd_cache_ttl_ms
1245            };
1246        self.prune_session_cwd_cache(&active_sessions, cutoff_cwd)
1247            .await;
1248
1249        if self.session_override_ttl_ms > 0 && now_ms >= self.session_override_ttl_ms {
1250            let cutoff_stats = now_ms - self.session_override_ttl_ms;
1251            let mut stats = self.session_stats.write().await;
1252            stats.retain(|sid, v| {
1253                active_sessions.contains_key(sid) || v.last_seen_ms >= cutoff_stats
1254            });
1255        }
1256    }
1257
1258    async fn prune_session_cwd_cache(&self, active_sessions: &HashMap<String, ()>, cutoff: u64) {
1259        if self.session_cwd_cache_max_entries == 0 {
1260            return;
1261        }
1262        let mut cache = self.session_cwd_cache.write().await;
1263
1264        if self.session_cwd_cache_ttl_ms > 0 {
1265            cache.retain(|sid, v| {
1266                if active_sessions.contains_key(sid) {
1267                    return true;
1268                }
1269                v.last_seen_ms >= cutoff
1270            });
1271        }
1272
1273        let max = self.session_cwd_cache_max_entries;
1274        if max == 0 || cache.len() <= max {
1275            return;
1276        }
1277
1278        // Drop least-recently-seen entries first.
1279        let mut keys = cache
1280            .iter()
1281            .map(|(sid, v)| (sid.clone(), v.last_seen_ms))
1282            .collect::<Vec<_>>();
1283        keys.sort_by_key(|(_, t)| *t);
1284        let remove_count = keys.len().saturating_sub(max);
1285        for (sid, _) in keys.into_iter().take(remove_count) {
1286            cache.remove(&sid);
1287        }
1288    }
1289}