cc-switch 0.1.38

Switch between multiple Claude / Codex configurations. Optional daemon proxies traffic to a built-in dashboard — requests, conversations, token stats. Cross-platform.
Documentation
use crate::daemon::aggregate::state::AliasMap;
use ccs_proxy::CaptureEvent;
use serde::Serialize;
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio_stream::StreamExt;
use tokio_stream::wrappers::BroadcastStream;

pub type ProxyEventReceiver = (String, broadcast::Receiver<CaptureEvent>);

#[derive(Debug, Clone, Serialize)]
pub struct TaggedCaptureEvent {
    pub upstream: String,
    pub aliases: Vec<String>,
    #[serde(flatten)]
    pub inner: CaptureEvent,
}

pub async fn event_merger(
    proxy_events: Vec<ProxyEventReceiver>,
    alias_map: Arc<AliasMap>,
    merged_tx: broadcast::Sender<TaggedCaptureEvent>,
) {
    let streams: Vec<_> = proxy_events
        .into_iter()
        .map(|(upstream, rx)| {
            let upstream = upstream.clone();
            BroadcastStream::new(rx)
                .filter_map(move |res| res.ok().map(|ev| (upstream.clone(), ev)))
        })
        .collect();

    let mut merged = futures::stream::select_all(streams);

    while let Some((upstream, event)) = merged.next().await {
        let aliases = alias_map.aliases_for(&upstream);
        let tagged = TaggedCaptureEvent {
            upstream,
            aliases,
            inner: event,
        };
        let _ = merged_tx.send(tagged);
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use ccs_proxy::CaptureEvent;
    use tokio::sync::broadcast;

    #[tokio::test]
    async fn merger_tags_events_with_upstream() {
        let (tx_a, _) = broadcast::channel::<CaptureEvent>(16);
        let (tx_b, _) = broadcast::channel::<CaptureEvent>(16);
        let (merged_tx, mut merged_rx) = broadcast::channel::<TaggedCaptureEvent>(64);

        let alias_map = Arc::new(AliasMap::from_entries(vec![(
            "https://a.example.com".to_string(),
            vec!["alias_a".to_string()],
        )]));

        let proxy_events = vec![
            ("https://a.example.com".to_string(), tx_a.subscribe()),
            ("https://b.example.com".to_string(), tx_b.subscribe()),
        ];

        let _merger = tokio::spawn(event_merger(proxy_events, alias_map, merged_tx));

        tokio::time::sleep(std::time::Duration::from_millis(10)).await;

        tx_a.send(CaptureEvent::RequestStarted {
            session_id: "sess1".to_string(),
            seq: 1,
            started_at: chrono::Utc::now(),
            model: Some("claude-sonnet-4-6".to_string()),
        })
        .unwrap();

        let tagged = tokio::time::timeout(std::time::Duration::from_secs(1), merged_rx.recv())
            .await
            .unwrap()
            .unwrap();

        assert_eq!(tagged.upstream, "https://a.example.com");
        assert_eq!(tagged.aliases, vec!["alias_a"]);
    }

    #[tokio::test]
    async fn merger_handles_unknown_upstream_aliases() {
        let (tx_b, _) = broadcast::channel::<CaptureEvent>(16);
        let (merged_tx, mut merged_rx) = broadcast::channel::<TaggedCaptureEvent>(64);

        let alias_map = Arc::new(AliasMap::from_entries(vec![]));

        let proxy_events = vec![("https://b.example.com".to_string(), tx_b.subscribe())];

        let _merger = tokio::spawn(event_merger(proxy_events, alias_map, merged_tx));

        tokio::time::sleep(std::time::Duration::from_millis(10)).await;

        tx_b.send(CaptureEvent::RequestStarted {
            session_id: "sess2".to_string(),
            seq: 1,
            started_at: chrono::Utc::now(),
            model: None,
        })
        .unwrap();

        let tagged = tokio::time::timeout(std::time::Duration::from_secs(1), merged_rx.recv())
            .await
            .unwrap()
            .unwrap();

        assert_eq!(tagged.upstream, "https://b.example.com");
        assert!(tagged.aliases.is_empty());
    }
}