Skip to main content

cc_switch/daemon/
lifecycle.rs

1//! Daemon main loop: spawn proxies, write state, supervise, shutdown.
2
3use crate::config::ConfigStorage;
4use crate::daemon::aggregate;
5use crate::daemon::aggregate::state::AliasMap;
6use crate::daemon::pidfile::Pidfile;
7use crate::daemon::state::{DaemonState, ProxyEntry};
8use anyhow::{Context, Result};
9use ccs_proxy::store::FsStore;
10use std::collections::BTreeSet;
11use std::path::PathBuf;
12use std::sync::Arc;
13use url::Url;
14
15pub type Upstream = (String, String);
16
17pub struct LifecycleConfig {
18    pub state_path: PathBuf,
19    pub pidfile_path: PathBuf,
20    pub data_root: PathBuf,
21    pub upstreams: Vec<Upstream>,
22    pub foreground: bool,
23}
24
25impl LifecycleConfig {
26    pub fn from_storage(storage: &ConfigStorage, foreground: bool) -> Result<Self> {
27        let home = dirs::home_dir().context("could not find home directory")?;
28        let cc_switch_dir = home.join(".cc-switch");
29        std::fs::create_dir_all(&cc_switch_dir)
30            .with_context(|| format!("failed to create {}", cc_switch_dir.display()))?;
31
32        let upstreams = dedupe_upstreams(storage);
33
34        Ok(Self {
35            state_path: cc_switch_dir.join("daemon-state.json"),
36            pidfile_path: cc_switch_dir.join("daemon.pid"),
37            data_root: cc_switch_dir.join("daemon-data"),
38            upstreams,
39            foreground,
40        })
41    }
42}
43
44fn dedupe_upstreams(storage: &ConfigStorage) -> Vec<Upstream> {
45    let mut seen = BTreeSet::new();
46    let mut result = Vec::new();
47    for config in storage.configurations.values() {
48        if config.url.is_empty() {
49            continue;
50        }
51        let key = ("claude".to_string(), config.url.clone());
52        if seen.insert(key.clone()) {
53            result.push(key);
54        }
55    }
56    result
57}
58
59fn upstream_hash(url: &str) -> String {
60    use std::hash::{Hash, Hasher};
61    let mut hasher = std::collections::hash_map::DefaultHasher::new();
62    url.hash(&mut hasher);
63    format!("{:08x}", hasher.finish() & 0xFFFF_FFFF)
64}
65
66pub fn run_daemon_blocking(
67    cfg: LifecycleConfig,
68    log_level: Option<String>,
69    verbose: u8,
70) -> Result<()> {
71    let env_val = std::env::var("CCS_LOG").ok();
72    let level = crate::daemon::logging::resolve_log_level(
73        log_level.as_deref(),
74        verbose,
75        env_val.as_deref(),
76    );
77    let mode = if cfg.foreground {
78        crate::daemon::logging::LogMode::Foreground
79    } else {
80        crate::daemon::logging::LogMode::Background
81    };
82
83    crate::daemon::logging::cleanup_old_logs(7);
84    let _guard = crate::daemon::logging::init_tracing(mode, level);
85
86    let rt = tokio::runtime::Builder::new_multi_thread()
87        .enable_all()
88        .build()
89        .context("failed to build tokio runtime")?;
90
91    rt.block_on(run_daemon_async(cfg))
92}
93
94async fn run_daemon_async(cfg: LifecycleConfig) -> Result<()> {
95    let pidfile = Pidfile::new(cfg.pidfile_path.clone());
96    pidfile
97        .acquire()
98        .context("failed to acquire pidfile — is another daemon already running?")?;
99
100    std::fs::create_dir_all(&cfg.data_root)
101        .with_context(|| format!("failed to create data_root {}", cfg.data_root.display()))?;
102
103    let mut handles: Vec<ccs_proxy::ProxyHandle> = Vec::new();
104    let mut proxy_entries: Vec<ProxyEntry> = Vec::new();
105
106    for (_provider, upstream_url) in &cfg.upstreams {
107        let parsed_url = match Url::parse(upstream_url) {
108            Ok(u) => u,
109            Err(err) => {
110                tracing::warn!(upstream = %upstream_url, error = %err, "skipping invalid upstream URL");
111                continue;
112            }
113        };
114
115        let hash = upstream_hash(upstream_url);
116        let data_dir = cfg.data_root.join(&hash);
117
118        let mut serve_cfg = ccs_proxy::ServeConfig::new(
119            ccs_proxy::ProviderKind::Claude,
120            parsed_url,
121            data_dir.clone(),
122        );
123        serve_cfg.api_server = false;
124
125        match ccs_proxy::serve(serve_cfg).await {
126            Ok(handle) => {
127                proxy_entries.push(ProxyEntry {
128                    provider: "claude".to_string(),
129                    upstream: upstream_url.clone(),
130                    proxy_port: handle.proxy_port,
131                    api_port: handle.api_port,
132                    data_dir,
133                    started_at: chrono::Utc::now().to_rfc3339(),
134                    restart_count: 0,
135                });
136                handles.push(handle);
137            }
138            Err(err) => {
139                tracing::error!(upstream = %upstream_url, error = %err, "failed to start proxy");
140            }
141        }
142    }
143
144    // Build AliasMap from current storage
145    let storage = ConfigStorage::load().unwrap_or_default();
146    let alias_map = Arc::new(AliasMap::from_storage(&storage));
147
148    // Collect stores and event senders for aggregate
149    let agg_stores: Vec<_> = proxy_entries
150        .iter()
151        .map(|entry| {
152            let store = Arc::new(
153                FsStore::open(entry.data_dir.clone())
154                    .expect("store open should succeed — dir already created by proxy"),
155            );
156            (entry.upstream.clone(), store)
157        })
158        .collect();
159
160    let agg_events: Vec<_> = handles
161        .iter()
162        .zip(proxy_entries.iter())
163        .map(|(handle, entry)| (entry.upstream.clone(), handle.event_sender().clone()))
164        .collect();
165
166    // Start aggregate server (hold handle alive for daemon lifetime)
167    let agg_handle = match aggregate::serve(agg_stores, agg_events, alias_map, 0).await {
168        Ok(handle) => {
169            tracing::info!(port = handle.port, "aggregate dashboard available");
170            Some(handle)
171        }
172        Err(err) => {
173            tracing::warn!(error = %err, "failed to start aggregate server — proxies still work");
174            None
175        }
176    };
177    let agg_port = agg_handle.as_ref().map(|h| h.port);
178
179    let state = DaemonState {
180        schema_version: 2,
181        pid: std::process::id(),
182        started_at: chrono::Utc::now().to_rfc3339(),
183        stopped_at: None,
184        data_root: cfg.data_root.clone(),
185        agg_port,
186        proxies: proxy_entries.clone(),
187    };
188    state
189        .save(&cfg.state_path)
190        .context("failed to write initial daemon state")?;
191
192    tracing::info!(
193        pid = state.pid,
194        proxy_count = handles.len(),
195        "daemon started"
196    );
197
198    // Wait for shutdown signal.
199    let shutdown = async {
200        let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
201            .expect("failed to install SIGTERM handler");
202        let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
203            .expect("failed to install SIGINT handler");
204        tokio::select! {
205            _ = sigterm.recv() => {}
206            _ = sigint.recv() => {}
207        }
208    };
209
210    // Supervisor loop: check handle health every 30s, respawn if needed.
211    let supervisor = supervisor_loop(&mut handles, &mut proxy_entries, &cfg);
212
213    tokio::select! {
214        _ = shutdown => {
215            tracing::info!("daemon shutting down");
216        }
217        _ = supervisor => {
218            // supervisor_loop runs forever unless cancelled
219        }
220    }
221
222    // Graceful shutdown: stop aggregate and proxy servers.
223    if let Some(agg) = agg_handle {
224        agg.shutdown().await;
225    }
226    for handle in handles {
227        handle.shutdown().await;
228    }
229
230    // Write final state with stopped_at.
231    let final_state = DaemonState {
232        schema_version: 2,
233        pid: std::process::id(),
234        started_at: state.started_at,
235        stopped_at: Some(chrono::Utc::now().to_rfc3339()),
236        data_root: cfg.data_root,
237        agg_port: None,
238        proxies: proxy_entries,
239    };
240    let _ = final_state.save(&cfg.state_path);
241
242    // Remove pidfile.
243    let _ = pidfile.release();
244
245    tracing::info!("daemon stopped");
246    Ok(())
247}
248
249async fn supervisor_loop(
250    handles: &mut [ccs_proxy::ProxyHandle],
251    entries: &mut [ProxyEntry],
252    cfg: &LifecycleConfig,
253) {
254    loop {
255        tokio::time::sleep(std::time::Duration::from_secs(30)).await;
256
257        for i in 0..handles.len() {
258            let finished = handles[i].is_finished();
259
260            if finished {
261                tracing::warn!(upstream = %entries[i].upstream, "proxy exited unexpectedly, respawning");
262
263                let parsed_url = match Url::parse(&entries[i].upstream) {
264                    Ok(u) => u,
265                    Err(_) => continue,
266                };
267
268                let serve_cfg = ccs_proxy::ServeConfig::new(
269                    ccs_proxy::ProviderKind::Claude,
270                    parsed_url,
271                    entries[i].data_dir.clone(),
272                );
273
274                match ccs_proxy::serve(serve_cfg).await {
275                    Ok(new_handle) => {
276                        entries[i].proxy_port = new_handle.proxy_port;
277                        entries[i].api_port = new_handle.api_port;
278                        entries[i].restart_count += 1;
279                        entries[i].started_at = chrono::Utc::now().to_rfc3339();
280                        handles[i] = new_handle;
281
282                        let state = DaemonState {
283                            schema_version: 2,
284                            pid: std::process::id(),
285                            started_at: entries.first().map_or_else(
286                                || chrono::Utc::now().to_rfc3339(),
287                                |e| e.started_at.clone(),
288                            ),
289                            stopped_at: None,
290                            data_root: cfg.data_root.clone(),
291                            agg_port: None,
292                            proxies: entries.to_vec(),
293                        };
294                        let _ = state.save(&cfg.state_path);
295                    }
296                    Err(err) => {
297                        tracing::error!(upstream = %entries[i].upstream, error = %err, "failed to respawn proxy");
298                    }
299                }
300            }
301        }
302    }
303}
304
305#[cfg(test)]
306mod tests {
307    use super::*;
308    use crate::config::types::Configuration;
309    use std::collections::BTreeMap;
310
311    fn make_storage(urls: &[&str]) -> ConfigStorage {
312        let mut configurations = BTreeMap::new();
313        for (i, url) in urls.iter().enumerate() {
314            let alias = format!("alias{i}");
315            configurations.insert(
316                alias.clone(),
317                Configuration {
318                    alias_name: alias,
319                    token: "sk-test".to_string(),
320                    url: url.to_string(),
321                    model: None,
322                    small_fast_model: None,
323                    max_thinking_tokens: None,
324                    api_timeout_ms: None,
325                    claude_code_disable_nonessential_traffic: None,
326                    anthropic_default_sonnet_model: None,
327                    anthropic_default_opus_model: None,
328                    anthropic_default_haiku_model: None,
329                    claude_code_experimental_agent_teams: None,
330                    claude_code_disable_1m_context: None,
331                    claude_code_subagent_model: None,
332                    claude_code_disable_nonstreaming_fallback: None,
333                    claude_code_effort_level: None,
334                    disable_prompt_caching: None,
335                    claude_code_disable_experimental_betas: None,
336                    disable_autoupdater: None,
337                },
338            );
339        }
340        ConfigStorage {
341            configurations,
342            claude_settings_dir: None,
343            default_storage_mode: None,
344            codex_configurations: None,
345        }
346    }
347
348    #[test]
349    fn dedupe_upstreams_removes_duplicates() {
350        let storage = make_storage(&[
351            "https://api.anthropic.com",
352            "https://api.anthropic.com",
353            "https://other.example.com/v1",
354        ]);
355        let result = dedupe_upstreams(&storage);
356        assert_eq!(result.len(), 2);
357        assert_eq!(result[0].1, "https://api.anthropic.com");
358        assert_eq!(result[1].1, "https://other.example.com/v1");
359    }
360
361    #[test]
362    fn dedupe_upstreams_skips_empty_urls() {
363        let storage = make_storage(&["", "https://api.anthropic.com"]);
364        let result = dedupe_upstreams(&storage);
365        assert_eq!(result.len(), 1);
366        assert_eq!(result[0].1, "https://api.anthropic.com");
367    }
368
369    #[test]
370    fn upstream_hash_is_deterministic() {
371        let h1 = upstream_hash("https://api.anthropic.com");
372        let h2 = upstream_hash("https://api.anthropic.com");
373        assert_eq!(h1, h2);
374        assert_eq!(h1.len(), 8);
375    }
376
377    #[test]
378    fn upstream_hash_differs_for_different_urls() {
379        let h1 = upstream_hash("https://api.anthropic.com");
380        let h2 = upstream_hash("https://other.example.com");
381        assert_ne!(h1, h2);
382    }
383}