1use crate::config::ConfigStorage;
4use crate::daemon::aggregate;
5use crate::daemon::aggregate::state::AliasMap;
6use crate::daemon::pidfile::Pidfile;
7use crate::daemon::state::{DaemonState, ProxyEntry};
8use anyhow::{Context, Result};
9use ccs_proxy::store::FsStore;
10use std::collections::BTreeSet;
11use std::path::PathBuf;
12use std::sync::Arc;
13use url::Url;
14
15pub type Upstream = (String, String);
16
17pub struct LifecycleConfig {
18 pub state_path: PathBuf,
19 pub pidfile_path: PathBuf,
20 pub data_root: PathBuf,
21 pub upstreams: Vec<Upstream>,
22 pub foreground: bool,
23}
24
25impl LifecycleConfig {
26 pub fn from_storage(storage: &ConfigStorage, foreground: bool) -> Result<Self> {
27 let home = dirs::home_dir().context("could not find home directory")?;
28 let cc_switch_dir = home.join(".cc-switch");
29 std::fs::create_dir_all(&cc_switch_dir)
30 .with_context(|| format!("failed to create {}", cc_switch_dir.display()))?;
31
32 let upstreams = dedupe_upstreams(storage);
33
34 Ok(Self {
35 state_path: cc_switch_dir.join("daemon-state.json"),
36 pidfile_path: cc_switch_dir.join("daemon.pid"),
37 data_root: cc_switch_dir.join("daemon-data"),
38 upstreams,
39 foreground,
40 })
41 }
42}
43
44fn dedupe_upstreams(storage: &ConfigStorage) -> Vec<Upstream> {
45 let mut seen = BTreeSet::new();
46 let mut result = Vec::new();
47 for config in storage.configurations.values() {
48 if config.url.is_empty() {
49 continue;
50 }
51 let key = ("claude".to_string(), config.url.clone());
52 if seen.insert(key.clone()) {
53 result.push(key);
54 }
55 }
56 result
57}
58
59fn upstream_hash(url: &str) -> String {
60 use std::hash::{Hash, Hasher};
61 let mut hasher = std::collections::hash_map::DefaultHasher::new();
62 url.hash(&mut hasher);
63 format!("{:08x}", hasher.finish() & 0xFFFF_FFFF)
64}
65
66pub fn run_daemon_blocking(
67 cfg: LifecycleConfig,
68 log_level: Option<String>,
69 verbose: u8,
70) -> Result<()> {
71 let env_val = std::env::var("CCS_LOG").ok();
72 let level = crate::daemon::logging::resolve_log_level(
73 log_level.as_deref(),
74 verbose,
75 env_val.as_deref(),
76 );
77 let mode = if cfg.foreground {
78 crate::daemon::logging::LogMode::Foreground
79 } else {
80 crate::daemon::logging::LogMode::Background
81 };
82
83 crate::daemon::logging::cleanup_old_logs(7);
84 let _guard = crate::daemon::logging::init_tracing(mode, level);
85
86 let rt = tokio::runtime::Builder::new_multi_thread()
87 .enable_all()
88 .build()
89 .context("failed to build tokio runtime")?;
90
91 rt.block_on(run_daemon_async(cfg))
92}
93
94async fn run_daemon_async(cfg: LifecycleConfig) -> Result<()> {
95 let pidfile = Pidfile::new(cfg.pidfile_path.clone());
96 pidfile
97 .acquire()
98 .context("failed to acquire pidfile — is another daemon already running?")?;
99
100 std::fs::create_dir_all(&cfg.data_root)
101 .with_context(|| format!("failed to create data_root {}", cfg.data_root.display()))?;
102
103 let mut handles: Vec<ccs_proxy::ProxyHandle> = Vec::new();
104 let mut proxy_entries: Vec<ProxyEntry> = Vec::new();
105
106 for (_provider, upstream_url) in &cfg.upstreams {
107 let parsed_url = match Url::parse(upstream_url) {
108 Ok(u) => u,
109 Err(err) => {
110 tracing::warn!(upstream = %upstream_url, error = %err, "skipping invalid upstream URL");
111 continue;
112 }
113 };
114
115 let hash = upstream_hash(upstream_url);
116 let data_dir = cfg.data_root.join(&hash);
117
118 let mut serve_cfg = ccs_proxy::ServeConfig::new(
119 ccs_proxy::ProviderKind::Claude,
120 parsed_url,
121 data_dir.clone(),
122 );
123 serve_cfg.api_server = false;
124
125 match ccs_proxy::serve(serve_cfg).await {
126 Ok(handle) => {
127 proxy_entries.push(ProxyEntry {
128 provider: "claude".to_string(),
129 upstream: upstream_url.clone(),
130 proxy_port: handle.proxy_port,
131 api_port: handle.api_port,
132 data_dir,
133 started_at: chrono::Utc::now().to_rfc3339(),
134 restart_count: 0,
135 });
136 handles.push(handle);
137 }
138 Err(err) => {
139 tracing::error!(upstream = %upstream_url, error = %err, "failed to start proxy");
140 }
141 }
142 }
143
144 let storage = ConfigStorage::load().unwrap_or_default();
146 let alias_map = Arc::new(AliasMap::from_storage(&storage));
147
148 let agg_stores: Vec<_> = proxy_entries
150 .iter()
151 .map(|entry| {
152 let store = Arc::new(
153 FsStore::open(entry.data_dir.clone())
154 .expect("store open should succeed — dir already created by proxy"),
155 );
156 (entry.upstream.clone(), store)
157 })
158 .collect();
159
160 let agg_events: Vec<_> = handles
161 .iter()
162 .zip(proxy_entries.iter())
163 .map(|(handle, entry)| (entry.upstream.clone(), handle.event_sender().clone()))
164 .collect();
165
166 let agg_handle = match aggregate::serve(agg_stores, agg_events, alias_map, 0).await {
168 Ok(handle) => {
169 tracing::info!(port = handle.port, "aggregate dashboard available");
170 Some(handle)
171 }
172 Err(err) => {
173 tracing::warn!(error = %err, "failed to start aggregate server — proxies still work");
174 None
175 }
176 };
177 let agg_port = agg_handle.as_ref().map(|h| h.port);
178
179 let state = DaemonState {
180 schema_version: 2,
181 pid: std::process::id(),
182 started_at: chrono::Utc::now().to_rfc3339(),
183 stopped_at: None,
184 data_root: cfg.data_root.clone(),
185 agg_port,
186 proxies: proxy_entries.clone(),
187 };
188 state
189 .save(&cfg.state_path)
190 .context("failed to write initial daemon state")?;
191
192 tracing::info!(
193 pid = state.pid,
194 proxy_count = handles.len(),
195 "daemon started"
196 );
197
198 let shutdown = async {
200 let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
201 .expect("failed to install SIGTERM handler");
202 let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
203 .expect("failed to install SIGINT handler");
204 tokio::select! {
205 _ = sigterm.recv() => {}
206 _ = sigint.recv() => {}
207 }
208 };
209
210 let supervisor = supervisor_loop(&mut handles, &mut proxy_entries, &cfg);
212
213 tokio::select! {
214 _ = shutdown => {
215 tracing::info!("daemon shutting down");
216 }
217 _ = supervisor => {
218 }
220 }
221
222 if let Some(agg) = agg_handle {
224 agg.shutdown().await;
225 }
226 for handle in handles {
227 handle.shutdown().await;
228 }
229
230 let final_state = DaemonState {
232 schema_version: 2,
233 pid: std::process::id(),
234 started_at: state.started_at,
235 stopped_at: Some(chrono::Utc::now().to_rfc3339()),
236 data_root: cfg.data_root,
237 agg_port: None,
238 proxies: proxy_entries,
239 };
240 let _ = final_state.save(&cfg.state_path);
241
242 let _ = pidfile.release();
244
245 tracing::info!("daemon stopped");
246 Ok(())
247}
248
249async fn supervisor_loop(
250 handles: &mut [ccs_proxy::ProxyHandle],
251 entries: &mut [ProxyEntry],
252 cfg: &LifecycleConfig,
253) {
254 loop {
255 tokio::time::sleep(std::time::Duration::from_secs(30)).await;
256
257 for i in 0..handles.len() {
258 let finished = handles[i].is_finished();
259
260 if finished {
261 tracing::warn!(upstream = %entries[i].upstream, "proxy exited unexpectedly, respawning");
262
263 let parsed_url = match Url::parse(&entries[i].upstream) {
264 Ok(u) => u,
265 Err(_) => continue,
266 };
267
268 let serve_cfg = ccs_proxy::ServeConfig::new(
269 ccs_proxy::ProviderKind::Claude,
270 parsed_url,
271 entries[i].data_dir.clone(),
272 );
273
274 match ccs_proxy::serve(serve_cfg).await {
275 Ok(new_handle) => {
276 entries[i].proxy_port = new_handle.proxy_port;
277 entries[i].api_port = new_handle.api_port;
278 entries[i].restart_count += 1;
279 entries[i].started_at = chrono::Utc::now().to_rfc3339();
280 handles[i] = new_handle;
281
282 let state = DaemonState {
283 schema_version: 2,
284 pid: std::process::id(),
285 started_at: entries.first().map_or_else(
286 || chrono::Utc::now().to_rfc3339(),
287 |e| e.started_at.clone(),
288 ),
289 stopped_at: None,
290 data_root: cfg.data_root.clone(),
291 agg_port: None,
292 proxies: entries.to_vec(),
293 };
294 let _ = state.save(&cfg.state_path);
295 }
296 Err(err) => {
297 tracing::error!(upstream = %entries[i].upstream, error = %err, "failed to respawn proxy");
298 }
299 }
300 }
301 }
302 }
303}
304
305#[cfg(test)]
306mod tests {
307 use super::*;
308 use crate::config::types::Configuration;
309 use std::collections::BTreeMap;
310
311 fn make_storage(urls: &[&str]) -> ConfigStorage {
312 let mut configurations = BTreeMap::new();
313 for (i, url) in urls.iter().enumerate() {
314 let alias = format!("alias{i}");
315 configurations.insert(
316 alias.clone(),
317 Configuration {
318 alias_name: alias,
319 token: "sk-test".to_string(),
320 url: url.to_string(),
321 model: None,
322 small_fast_model: None,
323 max_thinking_tokens: None,
324 api_timeout_ms: None,
325 claude_code_disable_nonessential_traffic: None,
326 anthropic_default_sonnet_model: None,
327 anthropic_default_opus_model: None,
328 anthropic_default_haiku_model: None,
329 claude_code_experimental_agent_teams: None,
330 claude_code_disable_1m_context: None,
331 claude_code_subagent_model: None,
332 claude_code_disable_nonstreaming_fallback: None,
333 claude_code_effort_level: None,
334 disable_prompt_caching: None,
335 claude_code_disable_experimental_betas: None,
336 disable_autoupdater: None,
337 },
338 );
339 }
340 ConfigStorage {
341 configurations,
342 claude_settings_dir: None,
343 default_storage_mode: None,
344 codex_configurations: None,
345 }
346 }
347
348 #[test]
349 fn dedupe_upstreams_removes_duplicates() {
350 let storage = make_storage(&[
351 "https://api.anthropic.com",
352 "https://api.anthropic.com",
353 "https://other.example.com/v1",
354 ]);
355 let result = dedupe_upstreams(&storage);
356 assert_eq!(result.len(), 2);
357 assert_eq!(result[0].1, "https://api.anthropic.com");
358 assert_eq!(result[1].1, "https://other.example.com/v1");
359 }
360
361 #[test]
362 fn dedupe_upstreams_skips_empty_urls() {
363 let storage = make_storage(&["", "https://api.anthropic.com"]);
364 let result = dedupe_upstreams(&storage);
365 assert_eq!(result.len(), 1);
366 assert_eq!(result[0].1, "https://api.anthropic.com");
367 }
368
369 #[test]
370 fn upstream_hash_is_deterministic() {
371 let h1 = upstream_hash("https://api.anthropic.com");
372 let h2 = upstream_hash("https://api.anthropic.com");
373 assert_eq!(h1, h2);
374 assert_eq!(h1.len(), 8);
375 }
376
377 #[test]
378 fn upstream_hash_differs_for_different_urls() {
379 let h1 = upstream_hash("https://api.anthropic.com");
380 let h2 = upstream_hash("https://other.example.com");
381 assert_ne!(h1, h2);
382 }
383}