Skip to main content

cc_switch/daemon/
lifecycle.rs

1//! Daemon main loop: spawn proxies, write state, supervise, shutdown.
2
3use crate::config::ConfigStorage;
4use crate::daemon::aggregate;
5use crate::daemon::aggregate::state::AliasMap;
6use crate::daemon::pidfile::Pidfile;
7use crate::daemon::state::{DaemonState, ProxyEntry};
8use anyhow::{Context, Result};
9use ccs_proxy::store::FsStore;
10use std::collections::BTreeSet;
11use std::path::PathBuf;
12use std::sync::Arc;
13use url::Url;
14
15pub type Upstream = (String, String);
16
17pub struct LifecycleConfig {
18    pub state_path: PathBuf,
19    pub pidfile_path: PathBuf,
20    pub data_root: PathBuf,
21    pub upstreams: Vec<Upstream>,
22    pub foreground: bool,
23}
24
25impl LifecycleConfig {
26    pub fn from_storage(storage: &ConfigStorage, foreground: bool) -> Result<Self> {
27        let home = dirs::home_dir().context("could not find home directory")?;
28        let cc_switch_dir = home.join(".cc-switch");
29        std::fs::create_dir_all(&cc_switch_dir)
30            .with_context(|| format!("failed to create {}", cc_switch_dir.display()))?;
31
32        let upstreams = dedupe_upstreams(storage);
33
34        Ok(Self {
35            state_path: cc_switch_dir.join("daemon-state.json"),
36            pidfile_path: cc_switch_dir.join("daemon.pid"),
37            data_root: cc_switch_dir.join("daemon-data"),
38            upstreams,
39            foreground,
40        })
41    }
42}
43
44fn dedupe_upstreams(storage: &ConfigStorage) -> Vec<Upstream> {
45    let mut seen = BTreeSet::new();
46    let mut result = Vec::new();
47    for config in storage.configurations.values() {
48        if config.url.is_empty() {
49            continue;
50        }
51        let key = ("claude".to_string(), config.url.clone());
52        if seen.insert(key.clone()) {
53            result.push(key);
54        }
55    }
56    result
57}
58
59fn upstream_hash(url: &str) -> String {
60    use std::hash::{Hash, Hasher};
61    let mut hasher = std::collections::hash_map::DefaultHasher::new();
62    url.hash(&mut hasher);
63    format!("{:08x}", hasher.finish() & 0xFFFF_FFFF)
64}
65
66pub fn run_daemon_blocking(
67    cfg: LifecycleConfig,
68    log_level: Option<String>,
69    verbose: u8,
70) -> Result<()> {
71    let env_val = std::env::var("CCS_LOG").ok();
72    let level = crate::daemon::logging::resolve_log_level(
73        log_level.as_deref(),
74        verbose,
75        env_val.as_deref(),
76    );
77    let mode = if cfg.foreground {
78        crate::daemon::logging::LogMode::Foreground
79    } else {
80        crate::daemon::logging::LogMode::Background
81    };
82
83    crate::daemon::logging::cleanup_old_logs(7);
84    let _guard = crate::daemon::logging::init_tracing(mode, level);
85
86    let rt = tokio::runtime::Builder::new_multi_thread()
87        .enable_all()
88        .build()
89        .context("failed to build tokio runtime")?;
90
91    rt.block_on(run_daemon_async(cfg))
92}
93
94async fn run_daemon_async(cfg: LifecycleConfig) -> Result<()> {
95    let pidfile = Pidfile::new(cfg.pidfile_path.clone());
96    pidfile
97        .acquire()
98        .context("failed to acquire pidfile — is another daemon already running?")?;
99
100    std::fs::create_dir_all(&cfg.data_root)
101        .with_context(|| format!("failed to create data_root {}", cfg.data_root.display()))?;
102
103    let mut handles: Vec<ccs_proxy::ProxyHandle> = Vec::new();
104    let mut proxy_entries: Vec<ProxyEntry> = Vec::new();
105
106    for (_provider, upstream_url) in &cfg.upstreams {
107        let parsed_url = match Url::parse(upstream_url) {
108            Ok(u) => u,
109            Err(err) => {
110                tracing::warn!(upstream = %upstream_url, error = %err, "skipping invalid upstream URL");
111                continue;
112            }
113        };
114
115        let hash = upstream_hash(upstream_url);
116        let data_dir = cfg.data_root.join(&hash);
117
118        let mut serve_cfg = ccs_proxy::ServeConfig::new(
119            ccs_proxy::ProviderKind::Claude,
120            parsed_url,
121            data_dir.clone(),
122        );
123        serve_cfg.api_server = false;
124
125        match ccs_proxy::serve(serve_cfg).await {
126            Ok(handle) => {
127                proxy_entries.push(ProxyEntry {
128                    provider: "claude".to_string(),
129                    upstream: upstream_url.clone(),
130                    proxy_port: handle.proxy_port,
131                    api_port: handle.api_port,
132                    data_dir,
133                    started_at: chrono::Utc::now().to_rfc3339(),
134                    restart_count: 0,
135                });
136                handles.push(handle);
137            }
138            Err(err) => {
139                tracing::error!(upstream = %upstream_url, error = %err, "failed to start proxy");
140            }
141        }
142    }
143
144    // Build AliasMap from current storage
145    let storage = ConfigStorage::load().unwrap_or_default();
146    let alias_map = Arc::new(AliasMap::from_storage(&storage));
147
148    // Collect stores and event senders for aggregate
149    let agg_stores: Vec<_> = proxy_entries
150        .iter()
151        .map(|entry| {
152            let store = Arc::new(
153                FsStore::open(entry.data_dir.clone())
154                    .expect("store open should succeed — dir already created by proxy"),
155            );
156            (entry.upstream.clone(), store)
157        })
158        .collect();
159
160    let agg_events: Vec<_> = handles
161        .iter()
162        .zip(proxy_entries.iter())
163        .map(|(handle, entry)| (entry.upstream.clone(), handle.event_sender().clone()))
164        .collect();
165
166    // Start aggregate server (hold handle alive for daemon lifetime)
167    let agg_handle = match aggregate::serve(agg_stores, agg_events, alias_map, 0).await {
168        Ok(handle) => {
169            tracing::info!(port = handle.port, "aggregate dashboard available");
170            Some(handle)
171        }
172        Err(err) => {
173            tracing::warn!(error = %err, "failed to start aggregate server — proxies still work");
174            None
175        }
176    };
177    let agg_port = agg_handle.as_ref().map(|h| h.port);
178
179    let state = DaemonState {
180        schema_version: 2,
181        version: crate::daemon::state::CURRENT_VERSION.to_string(),
182        pid: std::process::id(),
183        started_at: chrono::Utc::now().to_rfc3339(),
184        stopped_at: None,
185        data_root: cfg.data_root.clone(),
186        agg_port,
187        proxies: proxy_entries.clone(),
188    };
189    state
190        .save(&cfg.state_path)
191        .context("failed to write initial daemon state")?;
192
193    tracing::info!(
194        pid = state.pid,
195        proxy_count = handles.len(),
196        "daemon started"
197    );
198
199    // Wait for shutdown signal.
200    let shutdown = async {
201        let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
202            .expect("failed to install SIGTERM handler");
203        let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
204            .expect("failed to install SIGINT handler");
205        tokio::select! {
206            _ = sigterm.recv() => {}
207            _ = sigint.recv() => {}
208        }
209    };
210
211    // Supervisor loop: check handle health every 30s, respawn if needed.
212    let supervisor = supervisor_loop(&mut handles, &mut proxy_entries, &cfg);
213
214    tokio::select! {
215        _ = shutdown => {
216            tracing::info!("daemon shutting down");
217        }
218        _ = supervisor => {
219            // supervisor_loop runs forever unless cancelled
220        }
221    }
222
223    // Graceful shutdown: stop aggregate and proxy servers.
224    if let Some(agg) = agg_handle {
225        agg.shutdown().await;
226    }
227    for handle in handles {
228        handle.shutdown().await;
229    }
230
231    // Write final state with stopped_at.
232    let final_state = DaemonState {
233        schema_version: 2,
234        version: crate::daemon::state::CURRENT_VERSION.to_string(),
235        pid: std::process::id(),
236        started_at: state.started_at,
237        stopped_at: Some(chrono::Utc::now().to_rfc3339()),
238        data_root: cfg.data_root,
239        agg_port: None,
240        proxies: proxy_entries,
241    };
242    let _ = final_state.save(&cfg.state_path);
243
244    // Remove pidfile.
245    let _ = pidfile.release();
246
247    tracing::info!("daemon stopped");
248    Ok(())
249}
250
251async fn supervisor_loop(
252    handles: &mut [ccs_proxy::ProxyHandle],
253    entries: &mut [ProxyEntry],
254    cfg: &LifecycleConfig,
255) {
256    loop {
257        tokio::time::sleep(std::time::Duration::from_secs(30)).await;
258
259        for i in 0..handles.len() {
260            let finished = handles[i].is_finished();
261
262            if finished {
263                tracing::warn!(upstream = %entries[i].upstream, "proxy exited unexpectedly, respawning");
264
265                let parsed_url = match Url::parse(&entries[i].upstream) {
266                    Ok(u) => u,
267                    Err(_) => continue,
268                };
269
270                let serve_cfg = ccs_proxy::ServeConfig::new(
271                    ccs_proxy::ProviderKind::Claude,
272                    parsed_url,
273                    entries[i].data_dir.clone(),
274                );
275
276                match ccs_proxy::serve(serve_cfg).await {
277                    Ok(new_handle) => {
278                        entries[i].proxy_port = new_handle.proxy_port;
279                        entries[i].api_port = new_handle.api_port;
280                        entries[i].restart_count += 1;
281                        entries[i].started_at = chrono::Utc::now().to_rfc3339();
282                        handles[i] = new_handle;
283
284                        let state = DaemonState {
285                            schema_version: 2,
286                            version: crate::daemon::state::CURRENT_VERSION.to_string(),
287                            pid: std::process::id(),
288                            started_at: entries.first().map_or_else(
289                                || chrono::Utc::now().to_rfc3339(),
290                                |e| e.started_at.clone(),
291                            ),
292                            stopped_at: None,
293                            data_root: cfg.data_root.clone(),
294                            agg_port: None,
295                            proxies: entries.to_vec(),
296                        };
297                        let _ = state.save(&cfg.state_path);
298                    }
299                    Err(err) => {
300                        tracing::error!(upstream = %entries[i].upstream, error = %err, "failed to respawn proxy");
301                    }
302                }
303            }
304        }
305    }
306}
307
308#[cfg(test)]
309mod tests {
310    use super::*;
311    use crate::config::types::Configuration;
312    use std::collections::BTreeMap;
313
314    fn make_storage(urls: &[&str]) -> ConfigStorage {
315        let mut configurations = BTreeMap::new();
316        for (i, url) in urls.iter().enumerate() {
317            let alias = format!("alias{i}");
318            configurations.insert(
319                alias.clone(),
320                Configuration {
321                    alias_name: alias,
322                    token: "sk-test".to_string(),
323                    url: url.to_string(),
324                    model: None,
325                    small_fast_model: None,
326                    max_thinking_tokens: None,
327                    api_timeout_ms: None,
328                    claude_code_disable_nonessential_traffic: None,
329                    anthropic_default_sonnet_model: None,
330                    anthropic_default_opus_model: None,
331                    anthropic_default_haiku_model: None,
332                    claude_code_experimental_agent_teams: None,
333                    claude_code_disable_1m_context: None,
334                    claude_code_subagent_model: None,
335                    claude_code_disable_nonstreaming_fallback: None,
336                    claude_code_effort_level: None,
337                    disable_prompt_caching: None,
338                    claude_code_disable_experimental_betas: None,
339                    disable_autoupdater: None,
340                },
341            );
342        }
343        ConfigStorage {
344            configurations,
345            claude_settings_dir: None,
346            default_storage_mode: None,
347            codex_configurations: None,
348        }
349    }
350
351    #[test]
352    fn dedupe_upstreams_removes_duplicates() {
353        let storage = make_storage(&[
354            "https://api.anthropic.com",
355            "https://api.anthropic.com",
356            "https://other.example.com/v1",
357        ]);
358        let result = dedupe_upstreams(&storage);
359        assert_eq!(result.len(), 2);
360        assert_eq!(result[0].1, "https://api.anthropic.com");
361        assert_eq!(result[1].1, "https://other.example.com/v1");
362    }
363
364    #[test]
365    fn dedupe_upstreams_skips_empty_urls() {
366        let storage = make_storage(&["", "https://api.anthropic.com"]);
367        let result = dedupe_upstreams(&storage);
368        assert_eq!(result.len(), 1);
369        assert_eq!(result[0].1, "https://api.anthropic.com");
370    }
371
372    #[test]
373    fn upstream_hash_is_deterministic() {
374        let h1 = upstream_hash("https://api.anthropic.com");
375        let h2 = upstream_hash("https://api.anthropic.com");
376        assert_eq!(h1, h2);
377        assert_eq!(h1.len(), 8);
378    }
379
380    #[test]
381    fn upstream_hash_differs_for_different_urls() {
382        let h1 = upstream_hash("https://api.anthropic.com");
383        let h2 = upstream_hash("https://other.example.com");
384        assert_ne!(h1, h2);
385    }
386}