use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use base64::Engine;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
pub const QWEN_OAUTH_CLIENT_ID: &str = "f0304373b74a44d2b584a3fb70ca9e56";
pub const QWEN_OAUTH_SCOPE: &str = "openid profile email model.completion";
pub const QWEN_DEVICE_CODE_URL: &str = "https://chat.qwen.ai/api/v1/oauth2/device/code";
pub const QWEN_TOKEN_URL: &str = "https://chat.qwen.ai/api/v1/oauth2/token";
pub const DEVICE_CODE_GRANT: &str = "urn:ietf:params:oauth:grant-type:device_code";
pub const QWEN_DEFAULT_API_HOST: &str = "dashscope.aliyuncs.com/compatible-mode/v1";
pub const QWEN_DEFAULT_CHAT_URL: &str = "https://portal.qwen.ai/v1/chat/completions";
pub const QWEN_OAUTH_MODEL: &str = "coder-model";
const REFRESH_BUFFER_MS: u64 = 30_000;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QwenCredentials {
pub access_token: String,
pub token_type: String,
pub refresh_token: String,
#[serde(default)]
pub resource_url: String,
pub expiry_date: u64,
}
impl QwenCredentials {
pub fn qwen_cli_path() -> PathBuf {
dirs::home_dir()
.unwrap_or_else(|| PathBuf::from("/"))
.join(".qwen")
.join("oauth_creds.json")
}
pub fn import_from_qwen_cli() -> Option<Self> {
let path = Self::qwen_cli_path();
let bytes = std::fs::read(&path).ok()?;
let creds: Self = serde_json::from_slice(&bytes).ok()?;
if creds.access_token.is_empty() || creds.refresh_token.is_empty() {
return None;
}
Some(creds)
}
pub fn qwen_cli_mtime() -> u64 {
let path = Self::qwen_cli_path();
std::fs::metadata(&path)
.and_then(|m| m.modified())
.ok()
.and_then(|t| t.duration_since(UNIX_EPOCH).ok())
.map(|d| d.as_secs())
.unwrap_or(0)
}
pub fn write_back_to_qwen_cli(&self) {
let path = Self::qwen_cli_path();
let Ok(json) = serde_json::to_vec_pretty(self) else {
tracing::warn!("Failed to serialize Qwen creds for qwen-cli write-back");
return;
};
if let Some(parent) = path.parent()
&& let Err(e) = std::fs::create_dir_all(parent)
{
tracing::warn!("Failed to create {} parent dir: {}", parent.display(), e);
return;
}
if let Err(e) = std::fs::write(&path, &json) {
tracing::warn!("Failed to write back to {}: {}", path.display(), e);
return;
}
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let _ = std::fs::set_permissions(&path, std::fs::Permissions::from_mode(0o600));
}
}
pub fn from_provider_config(cfg: &crate::config::ProviderConfig) -> Option<Self> {
let access_token = cfg.api_key.as_ref().filter(|s| !s.is_empty())?.clone();
let refresh_token = cfg
.refresh_token
.as_ref()
.filter(|s| !s.is_empty())?
.clone();
Some(Self {
access_token,
token_type: "Bearer".to_string(),
refresh_token,
resource_url: cfg.resource_url.clone().unwrap_or_default(),
expiry_date: cfg.expiry_date.unwrap_or(0),
})
}
pub fn wipe_dead_credentials() {
if let Ok(path) = std::panic::catch_unwind(crate::config::keys_path)
&& path.exists()
&& let Ok(raw) = std::fs::read_to_string(&path)
&& let Ok(mut doc) = raw.parse::<toml::Value>()
{
let mut changed = false;
if let Some(qwen) = doc
.get_mut("providers")
.and_then(|p| p.get_mut("qwen"))
.and_then(|q| q.as_table_mut())
{
qwen.remove("api_key");
qwen.remove("refresh_token");
qwen.remove("expiry_date");
qwen.remove("resource_url");
changed = true;
}
if changed {
if let Ok(out) = toml::to_string_pretty(&doc) {
let _ = std::fs::write(&path, out);
}
tracing::info!("Wiped dead Qwen single-account creds from keys.toml");
}
}
let cli_path = Self::qwen_cli_path();
if cli_path.exists() {
let _ = std::fs::remove_file(&cli_path);
tracing::info!("Wiped dead Qwen creds from {}", cli_path.display());
}
}
pub fn persist_to_keys(&self) -> anyhow::Result<()> {
use crate::config::{daily_backup, keys_path};
use anyhow::Context;
let path = keys_path();
let mut doc: toml::Value = if path.exists() {
let content = std::fs::read_to_string(&path)?;
toml::from_str(&content).unwrap_or(toml::Value::Table(toml::map::Map::new()))
} else {
toml::Value::Table(toml::map::Map::new())
};
let root = doc
.as_table_mut()
.context("keys.toml root is not a table")?;
let providers = root
.entry("providers".to_string())
.or_insert_with(|| toml::Value::Table(toml::map::Map::new()))
.as_table_mut()
.context("[providers] is not a table")?;
let qwen = providers
.entry("qwen".to_string())
.or_insert_with(|| toml::Value::Table(toml::map::Map::new()))
.as_table_mut()
.context("[providers.qwen] is not a table")?;
qwen.insert(
"api_key".to_string(),
toml::Value::String(self.access_token.clone()),
);
qwen.insert(
"refresh_token".to_string(),
toml::Value::String(self.refresh_token.clone()),
);
qwen.insert(
"expiry_date".to_string(),
toml::Value::Integer(self.expiry_date as i64),
);
if !self.resource_url.is_empty() {
qwen.insert(
"resource_url".to_string(),
toml::Value::String(self.resource_url.clone()),
);
}
qwen.remove("enabled");
qwen.remove("default_model");
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
daily_backup(&path, 7);
let toml_str = toml::to_string_pretty(&doc)?;
std::fs::write(&path, toml_str)?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let _ = std::fs::set_permissions(&path, std::fs::Permissions::from_mode(0o600));
}
tracing::debug!("Persisted Qwen credentials to keys.toml");
Ok(())
}
pub fn from_account_configs(cfgs: &[crate::config::ProviderConfig]) -> Vec<Self> {
cfgs.iter().filter_map(Self::from_provider_config).collect()
}
pub fn persist_all_accounts(accounts: &[Self]) -> anyhow::Result<()> {
use crate::config::{daily_backup, keys_path};
use anyhow::Context;
let path = keys_path();
let mut doc: toml::Value = if path.exists() {
let content = std::fs::read_to_string(&path)?;
toml::from_str(&content).unwrap_or(toml::Value::Table(toml::map::Map::new()))
} else {
toml::Value::Table(toml::map::Map::new())
};
let root = doc
.as_table_mut()
.context("keys.toml root is not a table")?;
let providers = root
.entry("providers".to_string())
.or_insert_with(|| toml::Value::Table(toml::map::Map::new()))
.as_table_mut()
.context("[providers] is not a table")?;
let arr: Vec<toml::Value> = accounts
.iter()
.map(|a| {
let mut tbl = toml::map::Map::new();
tbl.insert(
"api_key".to_string(),
toml::Value::String(a.access_token.clone()),
);
tbl.insert(
"refresh_token".to_string(),
toml::Value::String(a.refresh_token.clone()),
);
tbl.insert(
"expiry_date".to_string(),
toml::Value::Integer(a.expiry_date as i64),
);
if !a.resource_url.is_empty() {
tbl.insert(
"resource_url".to_string(),
toml::Value::String(a.resource_url.clone()),
);
}
toml::Value::Table(tbl)
})
.collect();
providers.insert("qwen_accounts".to_string(), toml::Value::Array(arr));
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
daily_backup(&path, 7);
let toml_str = toml::to_string_pretty(&doc)?;
std::fs::write(&path, toml_str)?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let _ = std::fs::set_permissions(&path, std::fs::Permissions::from_mode(0o600));
}
tracing::debug!(
"Persisted {} Qwen rotation accounts to keys.toml",
accounts.len()
);
Ok(())
}
pub fn is_valid(&self) -> bool {
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
now_ms + REFRESH_BUFFER_MS < self.expiry_date
}
pub fn chat_completions_url(&self) -> String {
let host = if self.resource_url.is_empty() {
QWEN_DEFAULT_API_HOST.to_string()
} else {
self.resource_url.clone()
};
let with_scheme = if host.starts_with("http://") || host.starts_with("https://") {
host
} else {
format!("https://{}", host)
};
let with_v1 = if with_scheme.ends_with("/v1") {
with_scheme
} else {
format!("{}/v1", with_scheme.trim_end_matches('/'))
};
format!("{}/chat/completions", with_v1)
}
}
#[derive(Debug, Clone)]
pub struct PkcePair {
pub verifier: String,
pub challenge: String,
}
impl PkcePair {
pub fn generate() -> Self {
use rand::RngCore;
let mut bytes = [0u8; 32];
rand::rng().fill_bytes(&mut bytes);
let verifier = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(bytes);
let mut hasher = Sha256::new();
hasher.update(verifier.as_bytes());
let digest = hasher.finalize();
let challenge = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(digest);
Self {
verifier,
challenge,
}
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct DeviceCodeResponse {
pub device_code: String,
pub user_code: String,
pub verification_uri: String,
#[serde(default)]
pub verification_uri_complete: Option<String>,
pub expires_in: u64,
#[serde(default)]
pub interval: Option<u64>,
}
#[derive(Debug, Deserialize)]
struct TokenPollResponse {
#[serde(default)]
access_token: Option<String>,
#[serde(default)]
refresh_token: Option<String>,
#[serde(default)]
token_type: Option<String>,
#[serde(default)]
resource_url: Option<String>,
#[serde(default)]
expires_in: Option<u64>,
#[serde(default)]
error: Option<String>,
#[serde(default)]
error_description: Option<String>,
}
pub async fn start_device_flow(pkce: &PkcePair) -> anyhow::Result<DeviceCodeResponse> {
let client = reqwest::Client::new();
let resp = client
.post(QWEN_DEVICE_CODE_URL)
.header("content-type", "application/x-www-form-urlencoded")
.header("accept", "application/json")
.header("x-request-id", uuid::Uuid::new_v4().to_string())
.form(&[
("client_id", QWEN_OAUTH_CLIENT_ID),
("scope", QWEN_OAUTH_SCOPE),
("code_challenge", pkce.challenge.as_str()),
("code_challenge_method", "S256"),
])
.send()
.await?;
let status = resp.status();
if !status.is_success() {
let body = resp.text().await.unwrap_or_default();
anyhow::bail!("Qwen device flow request failed ({}): {}", status, body);
}
let raw = resp.text().await?;
if raw.starts_with("<!doctype") || raw.starts_with("<!DOCTYPE") || raw.starts_with("<html") {
anyhow::bail!(
"Qwen device flow blocked by WAF (bot detection). \
Try again in a few minutes or use a different network."
);
}
let dcr: DeviceCodeResponse = serde_json::from_str(&raw)?;
Ok(dcr)
}
pub async fn poll_for_token(device_code: &str, pkce: &PkcePair) -> anyhow::Result<QwenCredentials> {
let client = reqwest::Client::new();
let mut interval = Duration::from_secs(2);
const MAX_ATTEMPTS: u32 = 600;
for _ in 0..MAX_ATTEMPTS {
tokio::time::sleep(interval).await;
let resp = client
.post(QWEN_TOKEN_URL)
.header("content-type", "application/x-www-form-urlencoded")
.header("accept", "application/json")
.form(&[
("grant_type", DEVICE_CODE_GRANT),
("client_id", QWEN_OAUTH_CLIENT_ID),
("device_code", device_code),
("code_verifier", pkce.verifier.as_str()),
])
.send()
.await?;
let status = resp.status();
let raw = match resp.text().await {
Ok(t) => t,
Err(e) => {
tracing::warn!("Qwen token poll: failed to read body: {}", e);
continue;
}
};
if raw.starts_with("<!doctype") || raw.starts_with("<!DOCTYPE") || raw.starts_with("<html")
{
anyhow::bail!(
"Qwen token endpoint blocked by WAF (bot detection). \
Try again in a few minutes or use a different network."
);
}
let body: TokenPollResponse = match serde_json::from_str(&raw) {
Ok(b) => b,
Err(e) => {
tracing::warn!(
"Qwen token poll: parse error: {} (status={}, body={})",
e,
status,
&raw[..raw.len().min(500)]
);
continue;
}
};
if let Some(access_token) = body.access_token
&& !access_token.is_empty()
{
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let expires_in_ms = body.expires_in.unwrap_or(3600).saturating_mul(1000);
return Ok(QwenCredentials {
access_token,
token_type: body.token_type.unwrap_or_else(|| "Bearer".into()),
refresh_token: body.refresh_token.unwrap_or_default(),
resource_url: body.resource_url.unwrap_or_default(),
expiry_date: now_ms + expires_in_ms,
});
}
match body.error.as_deref() {
Some("authorization_pending") => continue,
Some("slow_down") => {
let new_secs = (interval.as_secs_f32() * 1.5).min(10.0);
interval = Duration::from_secs_f32(new_secs);
continue;
}
Some("expired_token") | Some("access_denied") => {
anyhow::bail!(
"Qwen device authorization {} ({}). Please retry.",
body.error.as_deref().unwrap_or("failed"),
body.error_description.unwrap_or_default()
);
}
Some(other) if status.as_u16() == 401 => {
anyhow::bail!("Qwen device code expired ({}). Please retry.", other);
}
Some(other) => {
anyhow::bail!(
"Qwen token poll error: {} ({})",
other,
body.error_description.unwrap_or_default()
);
}
None => continue,
}
}
anyhow::bail!("Qwen device flow timed out before authorization completed");
}
pub async fn refresh_credentials(creds: &QwenCredentials) -> anyhow::Result<QwenCredentials> {
if creds.refresh_token.is_empty() {
anyhow::bail!("Qwen refresh: no refresh_token stored — re-authentication required");
}
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(10))
.connect_timeout(std::time::Duration::from_secs(5))
.build()?;
let resp = client
.post(QWEN_TOKEN_URL)
.header("content-type", "application/x-www-form-urlencoded")
.header("accept", "application/json")
.form(&[
("grant_type", "refresh_token"),
("refresh_token", creds.refresh_token.as_str()),
("client_id", QWEN_OAUTH_CLIENT_ID),
])
.send()
.await?;
let status = resp.status();
if status.as_u16() == 400 {
anyhow::bail!(
"Qwen refresh_token invalid (HTTP 400). Run /onboard:provider to re-authenticate."
);
}
if !status.is_success() {
let body = resp.text().await.unwrap_or_default();
anyhow::bail!("Qwen refresh failed ({}): {}", status, body);
}
let raw = resp.text().await?;
if raw.starts_with("<!doctype") || raw.starts_with("<!DOCTYPE") || raw.starts_with("<html") {
anyhow::bail!(
"Qwen refresh blocked by WAF (bot detection). \
Try again in a few minutes or use a different network."
);
}
let body: TokenPollResponse = serde_json::from_str(&raw)?;
let access_token = body
.access_token
.ok_or_else(|| anyhow::anyhow!("Qwen refresh response missing access_token"))?;
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let expires_in_ms = body.expires_in.unwrap_or(3600).saturating_mul(1000);
Ok(QwenCredentials {
access_token,
token_type: body.token_type.unwrap_or_else(|| creds.token_type.clone()),
refresh_token: body
.refresh_token
.filter(|s| !s.is_empty())
.unwrap_or_else(|| creds.refresh_token.clone()),
resource_url: body
.resource_url
.filter(|s| !s.is_empty())
.unwrap_or_else(|| creds.resource_url.clone()),
expiry_date: now_ms + expires_in_ms,
})
}
pub fn open_browser(url: &str) -> bool {
let cmd = if cfg!(target_os = "macos") {
"open"
} else if cfg!(target_os = "windows") {
"cmd"
} else {
"xdg-open"
};
let result = if cfg!(target_os = "windows") {
std::process::Command::new(cmd)
.args(["/C", "start", "", url])
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn()
} else {
std::process::Command::new(cmd)
.arg(url)
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn()
};
result.is_ok()
}
pub struct QwenTokenManager {
state: RwLock<QwenCredentials>,
last_cli_mtime: RwLock<u64>,
}
impl QwenTokenManager {
pub fn new(creds: QwenCredentials) -> Self {
Self {
state: RwLock::new(creds),
last_cli_mtime: RwLock::new(QwenCredentials::qwen_cli_mtime()),
}
}
pub fn current_access_token(&self) -> String {
self.state
.read()
.map(|c| c.access_token.clone())
.unwrap_or_default()
}
pub fn current_chat_url(&self) -> String {
self.state
.read()
.map(|c| c.chat_completions_url())
.unwrap_or_else(|_| QWEN_DEFAULT_CHAT_URL.to_string())
}
pub fn snapshot(&self) -> QwenCredentials {
self.state
.read()
.map(|c| c.clone())
.unwrap_or_else(|_| QwenCredentials {
access_token: String::new(),
token_type: "Bearer".into(),
refresh_token: String::new(),
resource_url: String::new(),
expiry_date: 0,
})
}
pub async fn ensure_fresh(&self) -> anyhow::Result<()> {
self.reload_from_qwen_cli_if_changed();
let needs_refresh = {
let c = self
.state
.read()
.map_err(|_| anyhow::anyhow!("Qwen token state poisoned"))?;
!c.is_valid()
};
if !needs_refresh {
return Ok(());
}
let snap = self.snapshot();
let new_creds = refresh_credentials(&snap).await?;
if let Err(e) = new_creds.persist_to_keys() {
tracing::warn!("Failed to persist refreshed Qwen credentials: {}", e);
}
new_creds.write_back_to_qwen_cli();
if let Ok(mut w) = self.state.write() {
*w = new_creds;
}
if let Ok(mut m) = self.last_cli_mtime.write() {
*m = QwenCredentials::qwen_cli_mtime();
}
tracing::debug!("Qwen access token refreshed");
Ok(())
}
pub async fn force_refresh(&self) -> anyhow::Result<()> {
if self.reload_from_qwen_cli_if_changed() {
tracing::info!("Qwen: picked up rotated token from qwen-cli — no refresh needed");
return Ok(());
}
let snap = self.snapshot();
let new_creds = refresh_credentials(&snap).await?;
if let Err(e) = new_creds.persist_to_keys() {
tracing::warn!("Failed to persist force-refreshed Qwen credentials: {}", e);
}
new_creds.write_back_to_qwen_cli();
if let Ok(mut w) = self.state.write() {
*w = new_creds;
}
if let Ok(mut m) = self.last_cli_mtime.write() {
*m = QwenCredentials::qwen_cli_mtime();
}
Ok(())
}
fn reload_from_qwen_cli_if_changed(&self) -> bool {
let current_mtime = QwenCredentials::qwen_cli_mtime();
if current_mtime == 0 {
return false;
}
let last = self.last_cli_mtime.read().map(|g| *g).unwrap_or(0);
if current_mtime <= last {
return false;
}
match QwenCredentials::import_from_qwen_cli() {
Some(fresh) => {
tracing::info!(
"Qwen: detected oauth_creds.json update (mtime {} -> {}) — reloading",
last,
current_mtime
);
if let Err(e) = fresh.persist_to_keys() {
tracing::warn!("Failed to persist reloaded Qwen creds to keys.toml: {}", e);
}
if let Ok(mut w) = self.state.write() {
*w = fresh;
}
if let Ok(mut m) = self.last_cli_mtime.write() {
*m = current_mtime;
}
true
}
None => {
tracing::warn!("Qwen: oauth_creds.json changed but could not be parsed");
false
}
}
}
pub fn start_background_refresh(self: Arc<Self>) {
tokio::spawn(async move {
if let Err(e) = self.ensure_fresh().await {
tracing::warn!("Qwen initial token refresh failed: {}", e);
}
loop {
let deadline_secs = {
let snap = self.snapshot();
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let remaining_ms = snap.expiry_date.saturating_sub(now_ms);
let remaining_secs = remaining_ms / 1000;
remaining_secs.saturating_sub(60).max(30)
};
let tick_secs = deadline_secs.min(30);
tokio::time::sleep(Duration::from_secs(tick_secs)).await;
if self.reload_from_qwen_cli_if_changed() {
continue;
}
let should_refresh = {
let snap = self.snapshot();
!snap.is_valid()
};
if should_refresh && let Err(e) = self.ensure_fresh().await {
let msg = e.to_string();
if msg.contains("HTTP 400") {
tracing::error!(
"Qwen refresh_token permanently dead — wiping credentials. \
Run /onboard:provider to re-authenticate."
);
QwenCredentials::wipe_dead_credentials();
break;
}
tracing::warn!("Qwen background token refresh failed: {}", e);
tokio::time::sleep(Duration::from_secs(30)).await;
}
}
});
}
}
const QWEN_CLI_VERSION: &str = "0.14.0";
fn node_arch() -> &'static str {
match std::env::consts::ARCH {
"x86_64" => "x64",
"aarch64" => "arm64",
"arm" => "arm",
"x86" => "ia32",
other => other,
}
}
fn user_agent_platform() -> String {
let os = match std::env::consts::OS {
"macos" => "darwin",
"windows" => "win32",
other => other, };
format!("{}; {}", os, node_arch())
}
pub fn qwen_extra_headers() -> Vec<(String, String)> {
let ua = format!("QwenCode/{} ({})", QWEN_CLI_VERSION, user_agent_platform());
vec![
("User-Agent".to_string(), ua.clone()),
("X-DashScope-CacheControl".to_string(), "enable".to_string()),
("X-DashScope-UserAgent".to_string(), ua),
("X-DashScope-AuthType".to_string(), "qwen-oauth".to_string()),
]
}
fn qwen_session_id() -> &'static str {
use std::sync::OnceLock;
static SESSION: OnceLock<String> = OnceLock::new();
SESSION.get_or_init(|| uuid::Uuid::new_v4().to_string())
}
fn qwen_prompt_id() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
let mut x = nanos as u64 ^ 0x9E37_79B9_7F4A_7C15;
x ^= x << 13;
x ^= x >> 7;
x ^= x << 17;
format!("{:013x}", x & 0x000F_FFFF_FFFF_FFFF)
}
fn is_vision_model(model: &str) -> bool {
let m = model.to_ascii_lowercase();
if m == "coder-model" {
return true;
}
for prefix in ["qwen-vl", "qwen3-vl-plus", "qwen3.5-plus"] {
if m.starts_with(prefix) {
return true;
}
}
false
}
fn normalize_content_to_array(content: &serde_json::Value) -> Vec<serde_json::Value> {
match content {
serde_json::Value::String(s) => {
vec![serde_json::json!({ "type": "text", "text": s })]
}
serde_json::Value::Array(arr) => arr.clone(),
_ => Vec::new(),
}
}
fn add_cache_control_to_content(content: &serde_json::Value) -> serde_json::Value {
let mut arr = normalize_content_to_array(content);
if let Some(last) = arr.last_mut()
&& let Some(obj) = last.as_object_mut()
{
obj.insert(
"cache_control".to_string(),
serde_json::json!({ "type": "ephemeral" }),
);
}
serde_json::Value::Array(arr)
}
pub fn qwen_body_transform(mut body: serde_json::Value) -> serde_json::Value {
let obj = match body.as_object_mut() {
Some(o) => o,
None => return body,
};
let is_streaming = obj.get("stream").and_then(|v| v.as_bool()).unwrap_or(false);
if let Some(serde_json::Value::Array(messages)) = obj.get_mut("messages") {
let msg_count = messages.len();
if msg_count > 0 {
let system_idx = messages
.iter()
.position(|m| m.get("role").and_then(|r| r.as_str()) == Some("system"));
let last_idx = msg_count - 1;
for (i, msg) in messages.iter_mut().enumerate() {
let should_cache = (Some(i) == system_idx) || (is_streaming && i == last_idx);
if !should_cache {
continue;
}
let Some(msg_obj) = msg.as_object_mut() else {
continue;
};
let content = match msg_obj.get("content") {
Some(c) if !c.is_null() => c.clone(),
_ => continue,
};
msg_obj.insert(
"content".to_string(),
add_cache_control_to_content(&content),
);
}
}
}
obj.insert(
"metadata".to_string(),
serde_json::json!({
"sessionId": qwen_session_id(),
"promptId": qwen_prompt_id(),
}),
);
let model = obj
.get("model")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
if is_vision_model(&model) {
obj.insert(
"vl_high_resolution_images".to_string(),
serde_json::Value::Bool(true),
);
}
if is_streaming
&& let Some(serde_json::Value::Array(tools)) = obj.get_mut("tools")
&& let Some(last) = tools.last_mut()
&& let Some(tool_obj) = last.as_object_mut()
{
tool_obj.insert(
"cache_control".to_string(),
serde_json::json!({ "type": "ephemeral" }),
);
}
body
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn pkce_pair_is_well_formed() {
let p = PkcePair::generate();
assert_eq!(p.verifier.len(), 43);
assert_eq!(p.challenge.len(), 43);
assert!(!p.verifier.contains('='));
assert!(!p.challenge.contains('='));
}
#[test]
fn chat_url_default() {
let creds = QwenCredentials {
access_token: "x".into(),
token_type: "Bearer".into(),
refresh_token: "y".into(),
resource_url: String::new(),
expiry_date: 0,
};
let url = creds.chat_completions_url();
assert!(url.ends_with("/v1/chat/completions"));
assert!(url.starts_with("https://"));
}
#[test]
fn chat_url_from_resource_url() {
let creds = QwenCredentials {
access_token: "x".into(),
token_type: "Bearer".into(),
refresh_token: "y".into(),
resource_url: "portal.qwen.ai".into(),
expiry_date: 0,
};
assert_eq!(
creds.chat_completions_url(),
"https://portal.qwen.ai/v1/chat/completions"
);
}
#[test]
fn chat_url_handles_existing_v1() {
let creds = QwenCredentials {
access_token: "x".into(),
token_type: "Bearer".into(),
refresh_token: "y".into(),
resource_url: "https://example.com/v1".into(),
expiry_date: 0,
};
assert_eq!(
creds.chat_completions_url(),
"https://example.com/v1/chat/completions"
);
}
#[test]
fn is_valid_returns_false_for_expired() {
let creds = QwenCredentials {
access_token: "x".into(),
token_type: "Bearer".into(),
refresh_token: "y".into(),
resource_url: String::new(),
expiry_date: 0,
};
assert!(!creds.is_valid());
}
#[test]
fn extra_headers_match_qwen_cli_exactly() {
let h = qwen_extra_headers();
let names: Vec<&str> = h.iter().map(|(k, _)| k.as_str()).collect();
assert_eq!(h.len(), 4, "expected exactly 4 headers, got {:?}", names);
assert!(names.contains(&"User-Agent"));
assert!(names.contains(&"X-DashScope-CacheControl"));
assert!(names.contains(&"X-DashScope-UserAgent"));
assert!(names.contains(&"X-DashScope-AuthType"));
let ua = h
.iter()
.find(|(k, _)| k == "User-Agent")
.map(|(_, v)| v.clone())
.unwrap();
let ds_ua = h
.iter()
.find(|(k, _)| k == "X-DashScope-UserAgent")
.map(|(_, v)| v.clone())
.unwrap();
assert_eq!(ua, ds_ua);
assert!(ua.starts_with("QwenCode/"));
let auth = h
.iter()
.find(|(k, _)| k == "X-DashScope-AuthType")
.map(|(_, v)| v.clone())
.unwrap();
assert_eq!(auth, "qwen-oauth");
}
fn sample_body() -> serde_json::Value {
serde_json::json!({
"model": "coder-model",
"messages": [
{ "role": "system", "content": "sys prompt" },
{ "role": "user", "content": "first user" },
{ "role": "assistant", "content": "asst reply" },
{ "role": "user", "content": "last user" }
],
"temperature": 0.7,
"top_p": 0.95,
"tool_choice": "auto",
"max_completion_tokens": 8192,
"include_reasoning": true,
"stream": true,
"stream_options": { "include_usage": true },
"tools": [
{
"type": "function",
"function": { "name": "first_tool", "description": "", "parameters": {} }
},
{
"type": "function",
"function": { "name": "last_tool", "description": "", "parameters": {} }
}
]
})
}
#[test]
fn body_transform_cache_control_streaming_system_and_last_message() {
let out = qwen_body_transform(sample_body());
let msgs = out.get("messages").and_then(|v| v.as_array()).unwrap();
let sys = &msgs[0];
assert_eq!(sys["role"], "system");
assert!(sys["content"].is_array());
assert_eq!(sys["content"][0]["type"], "text");
assert_eq!(sys["content"][0]["cache_control"]["type"], "ephemeral");
let u1 = &msgs[1];
assert!(
u1["content"].is_string(),
"first user should stay as plain string, got {:?}",
u1["content"]
);
let asst = &msgs[2];
assert!(asst["content"].is_string());
let u2 = &msgs[3];
assert!(u2["content"].is_array());
assert_eq!(u2["content"][0]["cache_control"]["type"], "ephemeral");
}
#[test]
fn body_transform_non_streaming_only_tags_system() {
let mut body = sample_body();
body["stream"] = serde_json::json!(false);
let out = qwen_body_transform(body);
let msgs = out.get("messages").and_then(|v| v.as_array()).unwrap();
assert!(msgs[0]["content"].is_array());
assert_eq!(msgs[0]["content"][0]["cache_control"]["type"], "ephemeral");
assert!(msgs[3]["content"].is_string());
}
#[test]
fn body_transform_preserves_all_fields() {
let out = qwen_body_transform(sample_body());
let obj = out.as_object().unwrap();
assert_eq!(obj.get("temperature"), Some(&serde_json::json!(0.7)));
assert_eq!(obj.get("top_p"), Some(&serde_json::json!(0.95)));
assert_eq!(obj.get("tool_choice"), Some(&serde_json::json!("auto")));
assert_eq!(
obj.get("max_completion_tokens"),
Some(&serde_json::json!(8192))
);
assert_eq!(obj.get("include_reasoning"), Some(&serde_json::json!(true)));
}
#[test]
fn body_transform_adds_metadata_with_session_and_prompt_ids() {
let out = qwen_body_transform(sample_body());
let meta = out.get("metadata").unwrap();
assert!(meta["sessionId"].is_string());
assert!(meta["promptId"].is_string());
}
#[test]
fn body_transform_vl_flag_only_for_vision_models() {
let out = qwen_body_transform(sample_body());
assert_eq!(out["vl_high_resolution_images"], true);
let mut body = sample_body();
body["model"] = serde_json::json!("qwen3-32b");
let out = qwen_body_transform(body);
assert!(
out.as_object()
.unwrap()
.get("vl_high_resolution_images")
.is_none(),
"text-only model should not carry vl_high_resolution_images"
);
}
#[test]
fn body_transform_does_not_force_max_tokens() {
let mut body = sample_body();
body.as_object_mut().unwrap().remove("max_tokens");
let out = qwen_body_transform(body);
assert!(out.as_object().unwrap().get("max_tokens").is_none());
}
#[test]
fn body_transform_tags_last_tool_only_when_streaming() {
let out = qwen_body_transform(sample_body());
let tools = out.get("tools").and_then(|v| v.as_array()).unwrap();
assert!(tools[0].get("cache_control").is_none());
assert_eq!(tools[1]["cache_control"]["type"], "ephemeral");
let mut body = sample_body();
body["stream"] = serde_json::json!(false);
let out = qwen_body_transform(body);
let tools = out.get("tools").and_then(|v| v.as_array()).unwrap();
assert!(tools[0].get("cache_control").is_none());
assert!(tools[1].get("cache_control").is_none());
}
#[test]
fn body_transform_preserves_existing_max_tokens() {
let mut body = sample_body();
body["max_tokens"] = serde_json::json!(4096);
let out = qwen_body_transform(body);
assert_eq!(out["max_tokens"], 4096);
}
#[test]
fn body_transform_cache_control_on_multimodal_last_message() {
let body = serde_json::json!({
"model": "coder-model",
"stream": true,
"messages": [
{ "role": "system", "content": "sys" },
{
"role": "user",
"content": [
{ "type": "text", "text": "look at this" },
{ "type": "image_url", "image_url": { "url": "data:..." } }
]
}
]
});
let out = qwen_body_transform(body);
let msgs = out["messages"].as_array().unwrap();
let u = &msgs[1];
let content = u["content"].as_array().unwrap();
assert_eq!(content.len(), 2);
assert_eq!(content[0]["type"], "text");
assert_eq!(content[1]["type"], "image_url");
assert_eq!(content[1]["cache_control"]["type"], "ephemeral");
}
#[test]
fn is_vision_model_matches_qwen_cli_list() {
assert!(is_vision_model("coder-model"));
assert!(is_vision_model("qwen-vl-max"));
assert!(is_vision_model("qwen-vl-max-latest"));
assert!(is_vision_model("qwen3-vl-plus"));
assert!(is_vision_model("qwen3.5-plus"));
assert!(is_vision_model("CODER-MODEL")); assert!(!is_vision_model("qwen3-32b"));
assert!(!is_vision_model("qwen-max"));
assert!(!is_vision_model(""));
}
#[test]
fn session_id_is_stable_within_process() {
let a = qwen_session_id();
let b = qwen_session_id();
assert_eq!(a, b);
}
#[test]
fn prompt_id_is_13_hex_chars() {
let id = qwen_prompt_id();
assert_eq!(id.len(), 13);
assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
}
}