1use openapi_contract::sse::SseStream;
2use openapi_contract::{ApiClient, ApiError, Method};
3use sqlx::sqlite::{SqlitePool, SqlitePoolOptions};
4use std::collections::HashMap;
5use std::path::PathBuf;
6
7fn cloud_debug_enabled() -> bool {
15 crate::env::debug_cloud()
16}
17
18static AUTH_POOL_CACHE: tokio::sync::Mutex<Option<HashMap<PathBuf, SqlitePool>>> =
19 tokio::sync::Mutex::const_new(None);
20
21const AUTH_TOKEN_KEY: &str = "token";
22const AUTH_REFRESH_TOKEN_KEY: &str = "refresh_token";
23const AUTH_HOST_KEY: &str = "token_host";
25const CLI_CLIENT_ID: &str = "difflore-cli";
26const PAST_VERDICT_RECALL_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(45);
27const PAST_VERDICT_RETRY_DELAYS_MS: &[u64] = &[100, 300, 700];
28
29#[derive(serde::Deserialize)]
30#[serde(rename_all = "camelCase")]
31struct TokenRefreshResponse {
32 token: String,
33 refresh_token: Option<String>,
34}
35
36#[derive(Clone, Copy)]
39enum SendPhase {
40 Initial,
42 Retry,
44}
45
46fn escape_path_id(s: &str) -> String {
49 s.replace('%', "%25")
50 .replace('/', "%2F")
51 .replace('#', "%23")
52 .replace('?', "%3F")
53}
54
55fn scrub_tokens_from_body(body: &str, tokens: &[Option<&str>]) -> String {
62 let mut out = body.to_owned();
63 for &token in tokens.iter().flatten() {
64 if token.len() >= 8 {
65 out = out.replace(token, "[REDACTED-TOKEN]");
66 }
67 }
68 out
69}
70
71fn truncate_for_error(body: &str, max_chars: usize) -> String {
72 if body.chars().count() <= max_chars {
73 return body.to_owned();
74 }
75 body.chars().take(max_chars).collect()
76}
77
78#[derive(Debug, Clone, PartialEq, Eq)]
96pub(crate) struct HttpFailure {
97 pub status: u16,
98 pub reason_phrase: String,
99 pub body_snippet: String,
100}
101
102#[derive(Debug, Clone, PartialEq, Eq)]
109pub(crate) enum OutboxFailure {
110 Http(HttpFailure),
111 Transport(String),
115}
116
117impl OutboxFailure {
118 pub fn format_for_outbox_last_error(&self) -> String {
132 match self {
133 Self::Http(http) => {
134 if http.body_snippet.is_empty() {
135 format!("{} {}", http.status, http.reason_phrase)
136 } else {
137 format!(
138 "{} {}: {}",
139 http.status, http.reason_phrase, http.body_snippet
140 )
141 }
142 }
143 Self::Transport(msg) => format!("transport: {msg}"),
147 }
148 }
149}
150
151pub(crate) fn normalize_body_snippet(body: &str, max_chars: usize) -> String {
160 let collapsed: String = body.split_whitespace().collect::<Vec<_>>().join(" ");
161 if collapsed.chars().count() <= max_chars {
162 return collapsed;
163 }
164 collapsed.chars().take(max_chars).collect()
165}
166
167use crate::context::types::PastVerdict;
168use crate::crypto::{decrypt_secret, encrypt_secret};
169
170use super::endpoints::pricing_url;
171
172use super::api_types::{
173 GetTrajectoryResponse, ImpactBannerDto, ImpactCoverageDto, ImpactFixScorecardDto,
174 ImpactTopRulesDto, ImpactWeeklyDto, PastVerdictDto, RecallPastVerdictsRequest,
175 RecordAcceptedEditRequest, RecordAcceptedEditResponse, RecordReviewMetricsRequest,
176 SaveTrajectoryRequest, UploadImportedReviewsRequest,
177};
178
179#[derive(Clone)]
180pub struct CloudClient {
181 client: reqwest::Client,
182 base_url: String,
183 token: Option<String>,
184}
185
186impl Default for CloudClient {
187 fn default() -> Self {
188 Self::new()
189 }
190}
191
192impl CloudClient {
193 #[allow(clippy::panic)]
194 pub fn new() -> Self {
196 let base_url = Self::resolve_cloud_url();
197 Self {
198 client: reqwest::Client::builder()
199 .timeout(std::time::Duration::from_secs(30))
200 .build()
201 .unwrap_or_else(|e| {
202 panic!("failed to build cloud HTTP client with 30s timeout: {e}")
203 }),
204 base_url,
205 token: None,
206 }
207 }
208
209 pub async fn create() -> Self {
210 let mut client = Self::new();
211 client.token = Self::load_token().await;
212 client
213 }
214
215 pub fn resolve_cloud_url() -> String {
216 super::endpoints::api_base()
217 }
218
219 fn auth_db_path() -> Result<PathBuf, String> {
220 Ok(crate::paths::data_home()?.join("cloud-auth.db"))
223 }
224
225 pub async fn auth_pool() -> Result<SqlitePool, String> {
226 let path = Self::auth_db_path()?;
227 let mut guard = AUTH_POOL_CACHE.lock().await;
228 let cache = guard.get_or_insert_with(HashMap::new);
229 if let Some(pool) = cache.get(&path) {
230 return Ok(pool.clone());
231 }
232
233 if let Some(parent) = path.parent() {
234 std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
235 crate::infra::db::restrict_to_owner(parent, true);
236 }
237
238 let opts = sqlx::sqlite::SqliteConnectOptions::new()
239 .filename(&path)
240 .create_if_missing(true);
241
242 let pool = SqlitePoolOptions::new()
243 .max_connections(1)
244 .connect_with(opts)
245 .await
246 .map_err(|e| e.to_string())?;
247
248 crate::infra::db::restrict_sqlite_files(&path);
251
252 sqlx::query!(
255 "CREATE TABLE IF NOT EXISTS auth (\
256 key TEXT PRIMARY KEY NOT NULL, \
257 value TEXT NOT NULL\
258 )"
259 )
260 .execute(&pool)
261 .await
262 .map_err(|e| format!("auth table create failed: {e}"))?;
263
264 cache.insert(path, pool.clone());
265 Ok(pool)
266 }
267
268 pub async fn auth_pool_public() -> Result<SqlitePool, String> {
269 Self::auth_pool().await
270 }
271
272 async fn save_encrypted_auth_key(key: &str, value: &str) -> Result<(), String> {
273 let encrypted = encrypt_secret(value)?;
274 let pool = Self::auth_pool().await?;
275 sqlx::query("INSERT OR REPLACE INTO auth (key, value) VALUES (?1, ?2)")
276 .bind(key)
277 .bind(encrypted)
278 .execute(&pool)
279 .await
280 .map_err(|e| e.to_string())?;
281 Ok(())
282 }
283
284 async fn load_encrypted_auth_key(key: &str, quiet: bool) -> Option<String> {
285 let pool = Self::auth_pool().await.ok()?;
286 let raw: String = sqlx::query_scalar("SELECT value FROM auth WHERE key = ?1")
287 .bind(key)
288 .fetch_optional(&pool)
289 .await
290 .ok()??;
291
292 match decrypt_secret(&raw) {
293 Ok(plaintext) => Some(plaintext),
294 Err(e) => {
295 if !quiet {
300 eprintln!(
301 "Token storage could not be decrypted: {e}. \
302 DiffLore left the stored token untouched; set DIFFLORE_MASTER_KEY if this is CI, \
303 or run `difflore cloud logout` then `difflore cloud login` to replace it."
304 );
305 }
306 None
307 }
308 }
309 }
310
311 async fn delete_auth_key(key: &str) -> Result<(), String> {
312 let pool = Self::auth_pool().await?;
313 sqlx::query("DELETE FROM auth WHERE key = ?1")
314 .bind(key)
315 .execute(&pool)
316 .await
317 .map_err(|e| e.to_string())?;
318 Ok(())
319 }
320
321 pub async fn save_token(token: &str) -> Result<(), String> {
322 Self::save_encrypted_auth_key(AUTH_TOKEN_KEY, token).await?;
323 let pool = Self::auth_pool().await?;
324 sqlx::query!("DELETE FROM auth WHERE key = 'login_nonce'")
325 .execute(&pool)
326 .await
327 .map_err(|e| e.to_string())?;
328 Ok(())
329 }
330
331 pub async fn save_refresh_token(refresh_token: &str) -> Result<(), String> {
332 Self::save_encrypted_auth_key(AUTH_REFRESH_TOKEN_KEY, refresh_token).await
333 }
334
335 pub async fn save_login_tokens(token: &str, refresh_token: Option<&str>) -> Result<(), String> {
336 Self::save_token(token).await?;
337 match refresh_token.map(str::trim).filter(|s| !s.is_empty()) {
338 Some(refresh_token) => Self::save_refresh_token(refresh_token).await?,
339 None => Self::delete_auth_key(AUTH_REFRESH_TOKEN_KEY).await?,
340 }
341 Self::save_encrypted_auth_key(AUTH_HOST_KEY, &super::endpoints::api_origin()).await?;
343 Ok(())
344 }
345
346 pub async fn load_token() -> Option<String> {
347 if let Some(token) = crate::env::non_empty(crate::env::DIFFLORE_TOKEN) {
351 return Some(token);
352 }
353
354 let token = Self::load_encrypted_auth_key(AUTH_TOKEN_KEY, false).await?;
355 Self::saved_credential_host_matches_current(false)
356 .await
357 .then_some(token)
358 }
359
360 pub async fn load_token_quiet() -> Option<String> {
364 if let Some(token) = crate::env::non_empty(crate::env::DIFFLORE_TOKEN) {
365 return Some(token);
366 }
367
368 let token = Self::load_encrypted_auth_key(AUTH_TOKEN_KEY, true).await?;
369 Self::saved_credential_host_matches_current(true)
370 .await
371 .then_some(token)
372 }
373
374 pub async fn load_refresh_token() -> Option<String> {
375 let token = Self::load_encrypted_auth_key(AUTH_REFRESH_TOKEN_KEY, false).await?;
376 Self::saved_credential_host_matches_current(false)
377 .await
378 .then_some(token)
379 }
380
381 async fn saved_credential_host_matches_current(quiet: bool) -> bool {
385 let current = super::endpoints::api_origin();
386 match Self::load_encrypted_auth_key(AUTH_HOST_KEY, quiet).await {
387 Some(stored) => stored == current,
388 None => current == super::endpoints::default_api_origin(),
389 }
390 }
391
392 pub async fn clear_token() -> Result<(), String> {
393 Self::delete_auth_key(AUTH_TOKEN_KEY).await?;
394 Self::delete_auth_key(AUTH_REFRESH_TOKEN_KEY).await?;
395 Self::delete_auth_key(AUTH_HOST_KEY).await
396 }
397
398 pub async fn refresh_saved_token() -> Option<String> {
399 let refresh_token = Self::load_refresh_token().await?;
400 let client = reqwest::Client::builder()
401 .timeout(std::time::Duration::from_secs(15))
402 .build()
403 .ok()?;
404 let url = format!(
405 "{}/token/refresh",
406 Self::resolve_cloud_url().trim_end_matches('/')
407 );
408 let resp = match client
409 .post(url)
410 .header("content-type", "application/json")
411 .json(&serde_json::json!({
412 "clientId": CLI_CLIENT_ID,
413 "refreshToken": refresh_token,
414 }))
415 .send()
416 .await
417 {
418 Ok(resp) => resp,
419 Err(e) => {
420 if cloud_debug_enabled() {
421 eprintln!("[cloud-client] token refresh network error: {e}");
422 }
423 return None;
424 }
425 };
426 let status = resp.status();
427 if !status.is_success() {
428 if cloud_debug_enabled() {
429 let raw = resp.text().await.unwrap_or_default();
430 let body = scrub_tokens_from_body(&raw, &[Some(refresh_token.as_str())]);
432 eprintln!(
433 "[cloud-client] token refresh returned {status}: {}",
434 truncate_for_error(&body, 500)
435 );
436 }
437 return None;
438 }
439 let body = match resp.json::<TokenRefreshResponse>().await {
440 Ok(body) => body,
441 Err(e) => {
442 if cloud_debug_enabled() {
443 eprintln!("[cloud-client] token refresh decode error: {e}");
444 }
445 return None;
446 }
447 };
448 if Self::save_login_tokens(&body.token, body.refresh_token.as_deref())
449 .await
450 .is_err()
451 {
452 return None;
453 }
454 Some(body.token)
455 }
456
457 async fn send_with_refresh<F>(
473 build: F,
474 ) -> Result<reqwest::Response, (SendPhase, reqwest::Error)>
475 where
476 F: Fn(Option<&str>) -> reqwest::RequestBuilder,
477 {
478 let resp = build(None)
479 .send()
480 .await
481 .map_err(|e| (SendPhase::Initial, e))?;
482 if resp.status() == reqwest::StatusCode::UNAUTHORIZED
483 && let Some(refreshed_token) = Self::refresh_saved_token().await
484 {
485 return build(Some(&refreshed_token))
486 .send()
487 .await
488 .map_err(|e| (SendPhase::Retry, e));
489 }
490 Ok(resp)
491 }
492
493 pub const fn is_logged_in(&self) -> bool {
494 self.token.is_some()
495 }
496
497 pub fn base_url(&self) -> &str {
498 &self.base_url
499 }
500
501 pub async fn recall_past_verdicts(
516 &self,
517 req: RecallPastVerdictsRequest,
518 ) -> Result<Vec<PastVerdict>, crate::CoreError> {
519 if !self.is_logged_in() {
520 return Ok(Vec::new());
521 }
522
523 let url = format!("{}/reviews/recall-past-verdicts", self.base_url);
524 let mut resp = match self
532 .send_recall_past_verdicts(&url, self.token.as_deref(), &req)
533 .await
534 {
535 Ok(r) => r,
536 Err(e) => {
537 if cloud_debug_enabled() {
538 eprintln!(
539 "[cloud-client] recall_past_verdicts network error after {} attempts: {e}",
540 PAST_VERDICT_RETRY_DELAYS_MS.len() + 1
541 );
542 }
543 return Ok(Vec::new());
544 }
545 };
546
547 let mut status = resp.status();
548 if status == reqwest::StatusCode::UNAUTHORIZED
549 && let Some(refreshed_token) = Self::refresh_saved_token().await
550 {
551 match self
552 .send_recall_past_verdicts(&url, Some(&refreshed_token), &req)
553 .await
554 {
555 Ok(r) => {
556 resp = r;
557 status = resp.status();
558 }
559 Err(e) => {
560 if cloud_debug_enabled() {
561 eprintln!(
562 "[cloud-client] recall_past_verdicts retry error after token refresh: {e}"
563 );
564 }
565 return Ok(Vec::new());
566 }
567 }
568 }
569 if status == reqwest::StatusCode::FORBIDDEN {
570 static NOTIFIED: std::sync::OnceLock<()> = std::sync::OnceLock::new();
575 NOTIFIED.get_or_init(|| {
576 eprintln!(
577 "[difflore] Past-verdict recall skipped: this request needs more review-memory capacity or team scope. \
578 Personal Cloud Free recall still works for capped memory; see pricing at {} to expand team-wide recall.",
579 pricing_url()
580 );
581 });
582 return Ok(Vec::new());
583 }
584 if status == reqwest::StatusCode::UNAUTHORIZED {
585 if cloud_debug_enabled() {
597 static NOTIFIED_401: std::sync::OnceLock<()> = std::sync::OnceLock::new();
598 NOTIFIED_401.get_or_init(|| {
599 eprintln!(
600 "[difflore] Past-verdict recall unauthorized (401). \
601 Continuing with local rules. Set DIFFLORE_DEBUG_CLOUD=1 \
602 for transport details, or run `difflore cloud status` \
603 to verify your session."
604 );
605 });
606 }
607 return Ok(Vec::new());
608 }
609 if !status.is_success() {
610 let body = resp.text().await.unwrap_or_default();
611 static NOTIFIED_OTHER: std::sync::OnceLock<()> = std::sync::OnceLock::new();
617 NOTIFIED_OTHER.get_or_init(|| {
618 if cloud_debug_enabled() {
619 eprintln!(
620 "[difflore] Past-verdict recall unavailable ({status}). \
621 Continuing with local rules. Cloud response: {body}"
622 );
623 } else {
624 eprintln!(
625 "[difflore] Past-verdict recall unavailable ({status}); \
626 continuing with local rules. Set DIFFLORE_DEBUG_CLOUD=1 for details."
627 );
628 }
629 });
630 return Ok(Vec::new());
631 }
632
633 let dtos: Vec<PastVerdictDto> = match resp.json().await {
634 Ok(v) => v,
635 Err(e) => {
636 if cloud_debug_enabled() {
637 eprintln!("[cloud-client] recall_past_verdicts decode error: {e}");
638 }
639 return Ok(Vec::new());
640 }
641 };
642
643 Ok(dtos
644 .into_iter()
645 .map(|d| PastVerdict {
646 extraction_id: d.extraction_id,
647 code_snippet: d.code_snippet,
648 issue_text: d.issue_text,
649 status: d.status,
650 reason: d.reason,
651 similarity: d.similarity,
652 created_at: d.created_at,
653 signature: d.signature,
654 source_pr_number: d.source_pr_number,
655 source_pr_title: d.source_pr_title,
656 source_pr_url: d.source_pr_url,
657 })
658 .collect())
659 }
660
661 fn recall_past_verdicts_request(
662 &self,
663 url: &str,
664 token: Option<&str>,
665 ) -> reqwest::RequestBuilder {
666 let mut builder = self
667 .client
668 .post(url)
669 .timeout(PAST_VERDICT_RECALL_TIMEOUT)
670 .header("content-type", "application/json");
671 if let Some(token) = token {
672 builder = builder.header("Authorization", format!("Bearer {token}"));
673 }
674 builder
675 }
676
677 async fn send_recall_past_verdicts(
678 &self,
679 url: &str,
680 token: Option<&str>,
681 req: &RecallPastVerdictsRequest,
682 ) -> Result<reqwest::Response, reqwest::Error> {
683 for &delay_ms in PAST_VERDICT_RETRY_DELAYS_MS {
687 match self
688 .recall_past_verdicts_request(url, token)
689 .json(req)
690 .send()
691 .await
692 {
693 Ok(resp) => return Ok(resp),
694 Err(_) => {
695 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
696 }
697 }
698 }
699 self.recall_past_verdicts_request(url, token)
700 .json(req)
701 .send()
702 .await
703 }
704
705 async fn post_fire_and_forget<T: serde::Serialize>(
711 &self,
712 path: &str,
713 body: &T,
714 endpoint_label: &'static str,
715 ) -> bool {
716 match self
717 .post_fire_and_forget_result(path, body, endpoint_label)
718 .await
719 {
720 Ok(()) => true,
721 Err(e) => {
722 if cloud_debug_enabled() {
723 eprintln!("[cloud-client] {e}");
724 }
725 false
726 }
727 }
728 }
729
730 async fn post_fire_and_forget_result<T: serde::Serialize>(
731 &self,
732 path: &str,
733 body: &T,
734 endpoint_label: &'static str,
735 ) -> Result<(), String> {
736 if !self.is_logged_in() {
737 return Err(format!("{endpoint_label} skipped: not logged in"));
738 }
739
740 let url = format!("{}{}", self.base_url, path);
741 let resp = Self::send_with_refresh(|refreshed| {
742 let mut builder = self
743 .client
744 .post(&url)
745 .header("content-type", "application/json");
746 match refreshed {
747 Some(token) => {
748 builder = builder.header("Authorization", format!("Bearer {token}"));
749 }
750 None => {
751 if let Some(ref token) = self.token {
752 builder = builder.header("Authorization", format!("Bearer {token}"));
753 }
754 }
755 }
756 builder.json(body)
757 })
758 .await
759 .map_err(|(phase, e)| match phase {
760 SendPhase::Initial => format!("{endpoint_label} network error: {e}"),
761 SendPhase::Retry => format!("{endpoint_label} retry network error: {e}"),
762 })?;
763
764 let status = resp.status();
765 if status.is_success() {
766 return Ok(());
767 }
768
769 let body = self
770 .scrub_response_body(&resp.text().await.unwrap_or_default())
771 .await;
772 Err(format!(
773 "{endpoint_label} returned {status}: {}",
774 truncate_for_error(&body, 500)
775 ))
776 }
777
778 async fn post_fire_and_forget_outcome<T: serde::Serialize>(
793 &self,
794 path: &str,
795 body: &T,
796 endpoint_label: &'static str,
797 ) -> Result<(), OutboxFailure> {
798 if !self.is_logged_in() {
799 return Err(OutboxFailure::Transport(format!(
804 "{endpoint_label} skipped: not logged in"
805 )));
806 }
807
808 let url = format!("{}{}", self.base_url, path);
809 let resp = Self::send_with_refresh(|refreshed| {
810 let mut builder = self
811 .client
812 .post(&url)
813 .header("content-type", "application/json");
814 match refreshed {
815 Some(token) => {
816 builder = builder.header("Authorization", format!("Bearer {token}"));
817 }
818 None => {
819 if let Some(ref token) = self.token {
820 builder = builder.header("Authorization", format!("Bearer {token}"));
821 }
822 }
823 }
824 builder.json(body)
825 })
826 .await
827 .map_err(|(phase, e)| {
828 let label = match phase {
829 SendPhase::Initial => "initial",
830 SendPhase::Retry => "retry",
831 };
832 OutboxFailure::Transport(format!("{endpoint_label} {label}: {e}"))
833 })?;
834
835 let status = resp.status();
836 if status.is_success() {
837 return Ok(());
838 }
839
840 let status_code = status.as_u16();
841 let reason_phrase = status
845 .canonical_reason()
846 .map_or_else(|| status_code.to_string(), str::to_owned);
847 let body_text = self
848 .scrub_response_body(&resp.text().await.unwrap_or_default())
849 .await;
850 let body_snippet = normalize_body_snippet(&body_text, 200);
851 Err(OutboxFailure::Http(HttpFailure {
852 status: status_code,
853 reason_phrase,
854 body_snippet,
855 }))
856 }
857
858 pub(crate) async fn save_trajectory_outcome(
863 &self,
864 pr_review_id: &str,
865 steps: serde_json::Value,
866 ) -> Result<(), OutboxFailure> {
867 let req = SaveTrajectoryRequest { steps };
868 let path = format!("/reviews/{}/trajectory", escape_path_id(pr_review_id));
869 self.post_fire_and_forget_outcome(&path, &req, "save_trajectory")
870 .await
871 }
872
873 pub(crate) async fn record_review_metrics_outcome(
875 &self,
876 review_id: &str,
877 req: RecordReviewMetricsRequest,
878 ) -> Result<(), OutboxFailure> {
879 let path = format!("/reviews/{}/metrics", escape_path_id(review_id));
880 self.post_fire_and_forget_outcome(&path, &req, "record_review_metrics")
881 .await
882 }
883
884 #[allow(clippy::too_many_arguments)]
886 pub(crate) async fn track_mcp_query_outcome(
887 &self,
888 file: &str,
889 intent: Option<&str>,
890 rules_injected: usize,
891 strict_match_count: usize,
892 rule_titles: Vec<String>,
893 rule_ids: Vec<String>,
894 client_label: Option<&str>,
895 repo_full_name: Option<&str>,
896 ) -> Result<(), OutboxFailure> {
897 let titles: Vec<String> = rule_titles.into_iter().take(10).collect();
898 let ids: Vec<String> = rule_ids.into_iter().take(10).collect();
899 let body = serde_json::json!({
900 "file": file,
901 "intent": intent,
902 "rulesInjected": rules_injected,
903 "strictMatchCount": strict_match_count,
904 "ruleTitles": titles,
905 "ruleIds": ids,
906 "client": client_label.unwrap_or("mcp-server"),
907 "repoFullName": repo_full_name,
908 });
909 self.post_fire_and_forget_outcome("/dashboard/mcp-query", &body, "track_mcp_query")
910 .await
911 }
912
913 pub(crate) async fn upload_imported_reviews_outcome(
915 &self,
916 req: &UploadImportedReviewsRequest,
917 ) -> Result<(), OutboxFailure> {
918 self.post_fire_and_forget_outcome("/reviews/import", req, "upload_imported_reviews")
919 .await
920 }
921
922 pub(crate) async fn post_observations_outcome(
924 &self,
925 batch: &[super::api_types::Observation],
926 ) -> Result<(), OutboxFailure> {
927 self.post_fire_and_forget_outcome("/cloud/observations", &batch, "post_observations")
928 .await
929 }
930
931 pub async fn record_review_metrics(
942 &self,
943 review_id: &str,
944 req: RecordReviewMetricsRequest,
945 ) -> bool {
946 let path = format!("/reviews/{}/metrics", escape_path_id(review_id));
947 self.post_fire_and_forget(&path, &req, "record_review_metrics")
948 .await
949 }
950
951 pub async fn save_trajectory(&self, pr_review_id: &str, steps: serde_json::Value) -> bool {
958 let req = SaveTrajectoryRequest { steps };
959 let path = format!("/reviews/{}/trajectory", escape_path_id(pr_review_id));
960 self.post_fire_and_forget(&path, &req, "save_trajectory")
961 .await
962 }
963
964 pub async fn get_trajectory(
985 &self,
986 pr_review_id: &str,
987 ) -> Result<GetTrajectoryResponse, String> {
988 let path = format!("/reviews/{}/trajectory", escape_path_id(pr_review_id));
989 self.get_json(&path, "get_trajectory").await
990 }
991
992 #[allow(clippy::too_many_arguments)]
1002 pub async fn track_mcp_query(
1003 &self,
1004 file: &str,
1005 intent: Option<&str>,
1006 rules_injected: usize,
1007 strict_match_count: usize,
1008 rule_titles: Vec<String>,
1009 rule_ids: Vec<String>,
1010 client_label: Option<&str>,
1011 repo_full_name: Option<&str>,
1012 ) -> bool {
1013 let titles: Vec<String> = rule_titles.into_iter().take(10).collect();
1015 let ids: Vec<String> = rule_ids.into_iter().take(10).collect();
1016 let body = serde_json::json!({
1017 "file": file,
1018 "intent": intent,
1019 "rulesInjected": rules_injected,
1020 "strictMatchCount": strict_match_count,
1021 "ruleTitles": titles,
1022 "ruleIds": ids,
1023 "client": client_label.unwrap_or("mcp-server"),
1024 "repoFullName": repo_full_name,
1025 });
1026 self.post_fire_and_forget("/dashboard/mcp-query", &body, "track_mcp_query")
1027 .await
1028 }
1029
1030 pub async fn record_accepted_edit(&self, req: RecordAcceptedEditRequest) -> bool {
1037 self.record_accepted_edit_response(req).await.map_or_else(
1038 |e| {
1039 if cloud_debug_enabled() {
1040 eprintln!("[cloud-client] {e}");
1041 }
1042 false
1043 },
1044 |response| response.acceptance_recorded,
1045 )
1046 }
1047
1048 pub async fn record_accepted_edit_response(
1055 &self,
1056 req: RecordAcceptedEditRequest,
1057 ) -> Result<RecordAcceptedEditResponse, String> {
1058 self.post_json("/accepted-edits", &req, "record_accepted_edit")
1059 .await
1060 }
1061
1062 pub async fn upload_imported_reviews(&self, req: &UploadImportedReviewsRequest) -> bool {
1068 self.post_fire_and_forget("/reviews/import", req, "upload_imported_reviews")
1069 .await
1070 }
1071
1072 pub async fn post_observations(&self, batch: &[super::api_types::Observation]) -> bool {
1073 self.post_fire_and_forget("/cloud/observations", &batch, "post_observations")
1074 .await
1075 }
1076
1077 pub async fn post_observation_events(
1078 &self,
1079 batch: &[super::observations::ObservationEvent],
1080 ) -> bool {
1081 self.post_fire_and_forget("/cloud/observations", &batch, "post_observation_events")
1082 .await
1083 }
1084
1085 pub async fn post_observation_events_result(
1086 &self,
1087 batch: &[super::observations::ObservationEvent],
1088 ) -> Result<(), String> {
1089 self.post_fire_and_forget_result("/cloud/observations", &batch, "post_observation_events")
1090 .await
1091 }
1092
1093 async fn scrub_response_body(&self, body: &str) -> String {
1098 let saved = Self::load_token_quiet().await;
1099 let refresh = Self::load_refresh_token().await;
1100 scrub_tokens_from_body(
1101 body,
1102 &[self.token.as_deref(), saved.as_deref(), refresh.as_deref()],
1103 )
1104 }
1105
1106 async fn get_json<T: serde::de::DeserializeOwned>(
1107 &self,
1108 path: &str,
1109 label: &'static str,
1110 ) -> Result<T, String> {
1111 if !self.is_logged_in() {
1112 return Err("not_logged_in".to_owned());
1113 }
1114 let url = format!("{}{}", self.base_url, path);
1115 let resp = Self::send_with_refresh(|refreshed| {
1116 let mut builder = self.client.get(&url);
1117 match refreshed {
1118 Some(token) => {
1119 builder = builder.header("Authorization", format!("Bearer {token}"));
1120 }
1121 None => {
1122 if let Some(ref token) = self.token {
1123 builder = builder.header("Authorization", format!("Bearer {token}"));
1124 }
1125 }
1126 }
1127 builder
1128 })
1129 .await
1130 .map_err(|(phase, e)| match phase {
1131 SendPhase::Initial => format!("[{label}] network error: {e}"),
1132 SendPhase::Retry => format!("[{label}] retry network error: {e}"),
1133 })?;
1134 let status = resp.status();
1135 if !status.is_success() {
1136 let body = self
1137 .scrub_response_body(&resp.text().await.unwrap_or_default())
1138 .await;
1139 return Err(format!(
1140 "[{label}] returned {status}: {}",
1141 truncate_for_error(&body, 500)
1142 ));
1143 }
1144 resp.json::<T>()
1145 .await
1146 .map_err(|e| format!("[{label}] decode error: {e}"))
1147 }
1148
1149 async fn post_json<B: serde::Serialize, R: serde::de::DeserializeOwned>(
1154 &self,
1155 path: &str,
1156 body: &B,
1157 label: &'static str,
1158 ) -> Result<R, String> {
1159 if !self.is_logged_in() {
1160 return Err("not_logged_in".to_owned());
1161 }
1162 let url = format!("{}{}", self.base_url, path);
1163 let resp = Self::send_with_refresh(|refreshed| {
1164 let mut builder = self
1165 .client
1166 .post(&url)
1167 .header("content-type", "application/json");
1168 match refreshed {
1169 Some(token) => {
1170 builder = builder.header("Authorization", format!("Bearer {token}"));
1171 }
1172 None => {
1173 if let Some(ref token) = self.token {
1174 builder = builder.header("Authorization", format!("Bearer {token}"));
1175 }
1176 }
1177 }
1178 builder.json(body)
1179 })
1180 .await
1181 .map_err(|(phase, e)| match phase {
1182 SendPhase::Initial => format!("[{label}] network error: {e}"),
1183 SendPhase::Retry => format!("[{label}] retry network error: {e}"),
1184 })?;
1185 let status = resp.status();
1186 if !status.is_success() {
1187 let body = self
1188 .scrub_response_body(&resp.text().await.unwrap_or_default())
1189 .await;
1190 return Err(format!(
1191 "[{label}] returned {status}: {}",
1192 truncate_for_error(&body, 500)
1193 ));
1194 }
1195 resp.json::<R>()
1196 .await
1197 .map_err(|e| format!("[{label}] decode error: {e}"))
1198 }
1199
1200 pub async fn build_corpus(
1203 &self,
1204 req: &super::api_types::BuildCorpusRequest,
1205 ) -> Result<super::api_types::BuildCorpusResult, String> {
1206 self.post_json("/knowledge/corpus", req, "build_corpus")
1207 .await
1208 }
1209
1210 pub async fn prime_corpus(
1213 &self,
1214 corpus_id: &str,
1215 ) -> Result<super::api_types::PrimeCorpusResult, String> {
1216 let path = format!("/knowledge/corpus/{corpus_id}/prime");
1217 self.post_json(&path, &serde_json::json!({}), "prime_corpus")
1218 .await
1219 }
1220
1221 pub async fn query_corpus(
1225 &self,
1226 corpus_id: &str,
1227 question: &str,
1228 ) -> Result<super::api_types::QueryCorpusResult, String> {
1229 let path = format!("/knowledge/corpus/{corpus_id}/query");
1230 let body = super::api_types::QueryCorpusRequest {
1231 question: question.to_owned(),
1232 };
1233 self.post_json(&path, &body, "query_corpus").await
1234 }
1235
1236 pub async fn list_corpora(&self) -> Result<Vec<super::api_types::CorpusSummary>, String> {
1239 self.get_json("/knowledge/corpora", "list_corpora").await
1240 }
1241
1242 pub async fn get_impact_banner(&self) -> Result<ImpactBannerDto, String> {
1244 self.get_json("/impact/banner", "impact_banner").await
1245 }
1246
1247 pub async fn get_impact_weekly(&self) -> Result<ImpactWeeklyDto, String> {
1249 self.get_json("/impact/weekly", "impact_weekly").await
1250 }
1251
1252 pub async fn get_impact_top_rules(&self) -> Result<ImpactTopRulesDto, String> {
1254 self.get_json("/impact/top-rules", "impact_top_rules").await
1255 }
1256
1257 pub async fn get_impact_coverage(&self) -> Result<ImpactCoverageDto, String> {
1259 self.get_json("/impact/coverage", "impact_coverage").await
1260 }
1261
1262 pub async fn get_impact_fix_scorecard(&self) -> Result<ImpactFixScorecardDto, String> {
1264 self.get_json("/impact/fix-scorecard", "impact_fix_scorecard")
1265 .await
1266 }
1267}
1268
1269impl ApiClient for CloudClient {
1270 fn request(
1271 &self,
1272 method: Method,
1273 path: &str,
1274 query: Option<&str>,
1275 body: Option<String>,
1276 ) -> impl Future<Output = Result<reqwest::Response, ApiError>> + Send {
1277 let mut url = format!("{}{}", self.base_url, path);
1278 if let Some(qs) = query {
1279 url.push('?');
1280 url.push_str(qs);
1281 }
1282 let reqwest_method = method.as_reqwest();
1283 let client = self.client.clone();
1284 let token = self.token.clone();
1285 async move {
1286 Self::send_with_refresh(|refreshed| {
1287 let mut req = client.request(reqwest_method.clone(), &url);
1288 match refreshed {
1289 Some(refreshed_token) => {
1291 req = req.header("Authorization", format!("Bearer {refreshed_token}"));
1292 }
1293 None => {
1295 if let Some(ref token) = token {
1296 req = req.header("Authorization", format!("Bearer {token}"));
1297 }
1298 }
1299 }
1300 if let Some(ref b) = body {
1301 req = req
1302 .header("content-type", "application/json")
1303 .body(b.clone());
1304 }
1305 req
1306 })
1307 .await
1308 .map_err(|(_phase, e)| ApiError::from(e))
1309 }
1310 }
1311
1312 fn request_stream(
1313 &self,
1314 method: Method,
1315 path: &str,
1316 query: Option<&str>,
1317 ) -> impl Future<Output = Result<SseStream, ApiError>> + Send {
1318 let mut url = format!("{}{}", self.base_url, path);
1319 if let Some(qs) = query {
1320 url.push('?');
1321 url.push_str(qs);
1322 }
1323 let mut req = self.client.request(method.as_reqwest(), &url);
1324 if let Some(ref token) = self.token {
1325 req = req.header("Authorization", format!("Bearer {token}"));
1326 }
1327 async move {
1328 let resp = req.send().await.map_err(ApiError::from)?;
1329 let stream = resp.bytes_stream();
1330 Ok(SseStream::new(Box::pin(stream)))
1331 }
1332 }
1333}
1334
1335#[cfg(test)]
1336mod tests {
1337 use super::{CloudClient, scrub_tokens_from_body};
1338
1339 #[test]
1340 fn scrub_tokens_redacts_every_known_token_over_8_chars() {
1341 let body = "401: bearer=secret-access-12345 refresh=secret-refresh-67890 short=abc";
1342 let scrubbed = scrub_tokens_from_body(
1343 body,
1344 &[
1345 Some("secret-access-12345"),
1346 Some("secret-refresh-67890"),
1347 Some("abc"),
1348 None,
1349 ],
1350 );
1351 assert!(
1352 !scrubbed.contains("secret-access-12345"),
1353 "bearer must be redacted"
1354 );
1355 assert!(
1356 !scrubbed.contains("secret-refresh-67890"),
1357 "refresh must be redacted"
1358 );
1359 assert!(scrubbed.contains("[REDACTED-TOKEN]"));
1360 assert!(scrubbed.contains("short=abc"));
1362 }
1363
1364 #[tokio::test]
1365 async fn auth_pool_public_reopens_same_token_store() {
1366 let _home = crate::db::shared_test_home();
1367 let pool = CloudClient::auth_pool_public()
1368 .await
1369 .expect("auth pool opens");
1370 sqlx::query("INSERT OR REPLACE INTO auth (key, value) VALUES (?1, ?2)")
1371 .bind("cache-test")
1372 .bind("cached")
1373 .execute(&pool)
1374 .await
1375 .expect("insert auth row");
1376
1377 let cached = CloudClient::auth_pool_public()
1378 .await
1379 .expect("auth pool reopens");
1380 let value: String = sqlx::query_scalar("SELECT value FROM auth WHERE key = ?1")
1381 .bind("cache-test")
1382 .fetch_one(&cached)
1383 .await
1384 .expect("read auth row");
1385 assert_eq!(value, "cached");
1386 }
1387}