1use crate::config::ConfigStorage;
4use crate::daemon::aggregate;
5use crate::daemon::aggregate::state::AliasMap;
6use crate::daemon::pidfile::Pidfile;
7use crate::daemon::state::{DaemonState, ProxyEntry};
8use anyhow::{Context, Result};
9use ccs_proxy::store::FsStore;
10use std::collections::BTreeSet;
11use std::path::PathBuf;
12use std::sync::Arc;
13use url::Url;
14
15pub type Upstream = (String, String);
16
17pub struct LifecycleConfig {
18 pub state_path: PathBuf,
19 pub pidfile_path: PathBuf,
20 pub data_root: PathBuf,
21 pub upstreams: Vec<Upstream>,
22 pub foreground: bool,
23}
24
25impl LifecycleConfig {
26 pub fn from_storage(storage: &ConfigStorage, foreground: bool) -> Result<Self> {
27 let home = dirs::home_dir().context("could not find home directory")?;
28 let cc_switch_dir = home.join(".cc-switch");
29 std::fs::create_dir_all(&cc_switch_dir)
30 .with_context(|| format!("failed to create {}", cc_switch_dir.display()))?;
31
32 let upstreams = dedupe_upstreams(storage);
33
34 Ok(Self {
35 state_path: cc_switch_dir.join("daemon-state.json"),
36 pidfile_path: cc_switch_dir.join("daemon.pid"),
37 data_root: cc_switch_dir.join("daemon-data"),
38 upstreams,
39 foreground,
40 })
41 }
42}
43
44fn dedupe_upstreams(storage: &ConfigStorage) -> Vec<Upstream> {
45 let mut seen = BTreeSet::new();
46 let mut result = Vec::new();
47 for config in storage.configurations.values() {
48 if config.url.is_empty() {
49 continue;
50 }
51 let key = ("claude".to_string(), config.url.clone());
52 if seen.insert(key.clone()) {
53 result.push(key);
54 }
55 }
56 result
57}
58
59fn upstream_hash(url: &str) -> String {
60 use std::hash::{Hash, Hasher};
61 let mut hasher = std::collections::hash_map::DefaultHasher::new();
62 url.hash(&mut hasher);
63 format!("{:08x}", hasher.finish() & 0xFFFF_FFFF)
64}
65
66pub fn run_daemon_blocking(
67 cfg: LifecycleConfig,
68 log_level: Option<String>,
69 verbose: u8,
70) -> Result<()> {
71 let env_val = std::env::var("CCS_LOG").ok();
72 let level = crate::daemon::logging::resolve_log_level(
73 log_level.as_deref(),
74 verbose,
75 env_val.as_deref(),
76 );
77 let mode = if cfg.foreground {
78 crate::daemon::logging::LogMode::Foreground
79 } else {
80 crate::daemon::logging::LogMode::Background
81 };
82
83 crate::daemon::logging::cleanup_old_logs(7);
84 let _guard = crate::daemon::logging::init_tracing(mode, level);
85
86 let rt = tokio::runtime::Builder::new_multi_thread()
87 .enable_all()
88 .build()
89 .context("failed to build tokio runtime")?;
90
91 rt.block_on(run_daemon_async(cfg))
92}
93
94async fn run_daemon_async(cfg: LifecycleConfig) -> Result<()> {
95 let pidfile = Pidfile::new(cfg.pidfile_path.clone());
96 pidfile
97 .acquire()
98 .context("failed to acquire pidfile — is another daemon already running?")?;
99
100 std::fs::create_dir_all(&cfg.data_root)
101 .with_context(|| format!("failed to create data_root {}", cfg.data_root.display()))?;
102
103 let mut handles: Vec<ccs_proxy::ProxyHandle> = Vec::new();
104 let mut proxy_entries: Vec<ProxyEntry> = Vec::new();
105
106 for (_provider, upstream_url) in &cfg.upstreams {
107 let parsed_url = match Url::parse(upstream_url) {
108 Ok(u) => u,
109 Err(err) => {
110 tracing::warn!(upstream = %upstream_url, error = %err, "skipping invalid upstream URL");
111 continue;
112 }
113 };
114
115 let hash = upstream_hash(upstream_url);
116 let data_dir = cfg.data_root.join(&hash);
117
118 let mut serve_cfg = ccs_proxy::ServeConfig::new(
119 ccs_proxy::ProviderKind::Claude,
120 parsed_url,
121 data_dir.clone(),
122 );
123 serve_cfg.api_server = false;
124
125 match ccs_proxy::serve(serve_cfg).await {
126 Ok(handle) => {
127 proxy_entries.push(ProxyEntry {
128 provider: "claude".to_string(),
129 upstream: upstream_url.clone(),
130 proxy_port: handle.proxy_port,
131 api_port: handle.api_port,
132 data_dir,
133 started_at: chrono::Utc::now().to_rfc3339(),
134 restart_count: 0,
135 });
136 handles.push(handle);
137 }
138 Err(err) => {
139 tracing::error!(upstream = %upstream_url, error = %err, "failed to start proxy");
140 }
141 }
142 }
143
144 let storage = ConfigStorage::load().unwrap_or_default();
146 let alias_map = Arc::new(AliasMap::from_storage(&storage));
147
148 let agg_stores: Vec<_> = proxy_entries
150 .iter()
151 .map(|entry| {
152 let store = Arc::new(
153 FsStore::open(entry.data_dir.clone())
154 .expect("store open should succeed — dir already created by proxy"),
155 );
156 (entry.upstream.clone(), store)
157 })
158 .collect();
159
160 let agg_events: Vec<_> = handles
161 .iter()
162 .zip(proxy_entries.iter())
163 .map(|(handle, entry)| (entry.upstream.clone(), handle.event_sender().clone()))
164 .collect();
165
166 let agg_handle = match aggregate::serve(agg_stores, agg_events, alias_map, 0).await {
168 Ok(handle) => {
169 tracing::info!(port = handle.port, "aggregate dashboard available");
170 Some(handle)
171 }
172 Err(err) => {
173 tracing::warn!(error = %err, "failed to start aggregate server — proxies still work");
174 None
175 }
176 };
177 let agg_port = agg_handle.as_ref().map(|h| h.port);
178
179 let state = DaemonState {
180 schema_version: 2,
181 version: crate::daemon::state::CURRENT_VERSION.to_string(),
182 pid: std::process::id(),
183 started_at: chrono::Utc::now().to_rfc3339(),
184 stopped_at: None,
185 data_root: cfg.data_root.clone(),
186 agg_port,
187 proxies: proxy_entries.clone(),
188 };
189 state
190 .save(&cfg.state_path)
191 .context("failed to write initial daemon state")?;
192
193 tracing::info!(
194 pid = state.pid,
195 proxy_count = handles.len(),
196 "daemon started"
197 );
198
199 let shutdown = async {
201 let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
202 .expect("failed to install SIGTERM handler");
203 let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
204 .expect("failed to install SIGINT handler");
205 tokio::select! {
206 _ = sigterm.recv() => {}
207 _ = sigint.recv() => {}
208 }
209 };
210
211 let supervisor = supervisor_loop(&mut handles, &mut proxy_entries, &cfg);
213
214 tokio::select! {
215 _ = shutdown => {
216 tracing::info!("daemon shutting down");
217 }
218 _ = supervisor => {
219 }
221 }
222
223 if let Some(agg) = agg_handle {
225 agg.shutdown().await;
226 }
227 for handle in handles {
228 handle.shutdown().await;
229 }
230
231 let final_state = DaemonState {
233 schema_version: 2,
234 version: crate::daemon::state::CURRENT_VERSION.to_string(),
235 pid: std::process::id(),
236 started_at: state.started_at,
237 stopped_at: Some(chrono::Utc::now().to_rfc3339()),
238 data_root: cfg.data_root,
239 agg_port: None,
240 proxies: proxy_entries,
241 };
242 let _ = final_state.save(&cfg.state_path);
243
244 let _ = pidfile.release();
246
247 tracing::info!("daemon stopped");
248 Ok(())
249}
250
251async fn supervisor_loop(
252 handles: &mut [ccs_proxy::ProxyHandle],
253 entries: &mut [ProxyEntry],
254 cfg: &LifecycleConfig,
255) {
256 loop {
257 tokio::time::sleep(std::time::Duration::from_secs(30)).await;
258
259 for i in 0..handles.len() {
260 let finished = handles[i].is_finished();
261
262 if finished {
263 tracing::warn!(upstream = %entries[i].upstream, "proxy exited unexpectedly, respawning");
264
265 let parsed_url = match Url::parse(&entries[i].upstream) {
266 Ok(u) => u,
267 Err(_) => continue,
268 };
269
270 let serve_cfg = ccs_proxy::ServeConfig::new(
271 ccs_proxy::ProviderKind::Claude,
272 parsed_url,
273 entries[i].data_dir.clone(),
274 );
275
276 match ccs_proxy::serve(serve_cfg).await {
277 Ok(new_handle) => {
278 entries[i].proxy_port = new_handle.proxy_port;
279 entries[i].api_port = new_handle.api_port;
280 entries[i].restart_count += 1;
281 entries[i].started_at = chrono::Utc::now().to_rfc3339();
282 handles[i] = new_handle;
283
284 let state = DaemonState {
285 schema_version: 2,
286 version: crate::daemon::state::CURRENT_VERSION.to_string(),
287 pid: std::process::id(),
288 started_at: entries.first().map_or_else(
289 || chrono::Utc::now().to_rfc3339(),
290 |e| e.started_at.clone(),
291 ),
292 stopped_at: None,
293 data_root: cfg.data_root.clone(),
294 agg_port: None,
295 proxies: entries.to_vec(),
296 };
297 let _ = state.save(&cfg.state_path);
298 }
299 Err(err) => {
300 tracing::error!(upstream = %entries[i].upstream, error = %err, "failed to respawn proxy");
301 }
302 }
303 }
304 }
305 }
306}
307
308#[cfg(test)]
309mod tests {
310 use super::*;
311 use crate::config::types::Configuration;
312 use std::collections::BTreeMap;
313
314 fn make_storage(urls: &[&str]) -> ConfigStorage {
315 let mut configurations = BTreeMap::new();
316 for (i, url) in urls.iter().enumerate() {
317 let alias = format!("alias{i}");
318 configurations.insert(
319 alias.clone(),
320 Configuration {
321 alias_name: alias,
322 token: "sk-test".to_string(),
323 url: url.to_string(),
324 model: None,
325 small_fast_model: None,
326 max_thinking_tokens: None,
327 api_timeout_ms: None,
328 claude_code_disable_nonessential_traffic: None,
329 anthropic_default_sonnet_model: None,
330 anthropic_default_opus_model: None,
331 anthropic_default_haiku_model: None,
332 claude_code_experimental_agent_teams: None,
333 claude_code_disable_1m_context: None,
334 claude_code_subagent_model: None,
335 claude_code_disable_nonstreaming_fallback: None,
336 claude_code_effort_level: None,
337 disable_prompt_caching: None,
338 claude_code_disable_experimental_betas: None,
339 disable_autoupdater: None,
340 },
341 );
342 }
343 ConfigStorage {
344 configurations,
345 claude_settings_dir: None,
346 default_storage_mode: None,
347 codex_configurations: None,
348 }
349 }
350
351 #[test]
352 fn dedupe_upstreams_removes_duplicates() {
353 let storage = make_storage(&[
354 "https://api.anthropic.com",
355 "https://api.anthropic.com",
356 "https://other.example.com/v1",
357 ]);
358 let result = dedupe_upstreams(&storage);
359 assert_eq!(result.len(), 2);
360 assert_eq!(result[0].1, "https://api.anthropic.com");
361 assert_eq!(result[1].1, "https://other.example.com/v1");
362 }
363
364 #[test]
365 fn dedupe_upstreams_skips_empty_urls() {
366 let storage = make_storage(&["", "https://api.anthropic.com"]);
367 let result = dedupe_upstreams(&storage);
368 assert_eq!(result.len(), 1);
369 assert_eq!(result[0].1, "https://api.anthropic.com");
370 }
371
372 #[test]
373 fn upstream_hash_is_deterministic() {
374 let h1 = upstream_hash("https://api.anthropic.com");
375 let h2 = upstream_hash("https://api.anthropic.com");
376 assert_eq!(h1, h2);
377 assert_eq!(h1.len(), 8);
378 }
379
380 #[test]
381 fn upstream_hash_differs_for_different_urls() {
382 let h1 = upstream_hash("https://api.anthropic.com");
383 let h2 = upstream_hash("https://other.example.com");
384 assert_ne!(h1, h2);
385 }
386}