use std::future::Future;
use crate::error::{OrchestratorError, Result};
use super::types::{ProjectEntry, RemoteProject};
pub fn parse_project_url(url: &str) -> (String, Option<String>) {
let (url_no_frag, frag_branch) = match url.split_once('#') {
Some((base, frag)) if !frag.is_empty() => (base, Some(frag.to_string())),
_ => (url, None),
};
if let Some(branch) = extract_tree_branch(url_no_frag) {
let suffix_len = "/tree/".len() + branch.len();
let base = url_no_frag[..url_no_frag.len() - suffix_len]
.trim_end_matches('/')
.to_string();
return (base, Some(branch));
}
(url_no_frag.trim_end_matches('/').to_string(), frag_branch)
}
fn extract_tree_branch(url: &str) -> Option<String> {
let scheme_end = url.find("://")? + 3;
let after_scheme = &url[scheme_end..];
let mut slash_count = 0;
for (i, c) in after_scheme.char_indices() {
if c == '/' {
slash_count += 1;
if slash_count == 3 {
let suffix = &after_scheme[i..]; if let Some(branch_part) = suffix.strip_prefix("/tree/") {
let branch = branch_part.split('/').next().unwrap_or("").to_string();
if !branch.is_empty() {
return Some(branch);
}
}
return None;
}
}
}
None
}
pub async fn resolve_default_branch(url: &str) -> Result<String> {
let output = tokio::process::Command::new("git")
.args(["ls-remote", "--symref", url, "HEAD"])
.output()
.await
.map_err(|e| {
OrchestratorError::Io(std::io::Error::other(format!(
"Failed to run git ls-remote: {}",
e
)))
})?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(OrchestratorError::Io(std::io::Error::other(format!(
"Failed to resolve default branch for '{}': {}",
url,
stderr.trim()
))));
}
let stdout = String::from_utf8_lossy(&output.stdout);
for line in stdout.lines() {
if let Some(rest) = line.strip_prefix("ref: refs/heads/") {
let branch = rest.split('\t').next().unwrap_or("").trim().to_string();
if !branch.is_empty() {
return Ok(branch);
}
}
}
Err(OrchestratorError::Io(std::io::Error::other(format!(
"Could not determine default branch for '{}': symref not found in git ls-remote output",
url
))))
}
pub async fn resolve_project_url_and_branch<F, Fut>(
raw_url: &str,
explicit_branch: Option<&str>,
resolver: F,
) -> Result<(String, String)>
where
F: FnOnce(String) -> Fut,
Fut: Future<Output = Result<String>>,
{
let (base_url, url_branch) = parse_project_url(raw_url);
let branch = if let Some(b) = explicit_branch {
b.to_string()
} else if let Some(b) = url_branch {
b
} else {
resolver(base_url.clone()).await?
};
Ok((base_url, branch))
}
#[derive(Debug, Clone)]
pub struct RemoteClient {
base_url: String,
token: Option<String>,
http: reqwest::Client,
}
impl RemoteClient {
pub fn new(base_url: impl Into<String>, token: Option<String>) -> Self {
Self {
base_url: base_url.into(),
token,
http: reqwest::Client::new(),
}
}
pub fn resolve_token(token: Option<String>, token_env: Option<&str>) -> Option<String> {
if let Some(t) = token {
return Some(t);
}
if let Some(env_name) = token_env {
return std::env::var(env_name).ok();
}
None
}
fn authorized(&self, builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
if let Some(ref token) = self.token {
builder.header("Authorization", format!("Bearer {}", token))
} else {
builder
}
}
pub async fn list_projects(&self) -> Result<Vec<RemoteProject>> {
let url = format!("{}/api/v1/projects/state", self.base_url);
let req = self.http.get(&url);
let req = self.authorized(req);
let response = req.send().await.map_err(|e| {
OrchestratorError::Io(std::io::Error::other(format!(
"Failed to connect to remote server '{}': {}",
self.base_url, e
)))
})?;
if !response.status().is_success() {
return Err(OrchestratorError::Io(std::io::Error::other(format!(
"Remote server returned status {}: {}",
response.status(),
response.status().canonical_reason().unwrap_or("Unknown")
))));
}
let projects: Vec<RemoteProject> = response.json().await.map_err(|e| {
OrchestratorError::Io(std::io::Error::other(format!(
"Failed to parse remote server response: {}",
e
)))
})?;
Ok(projects)
}
pub async fn list_all_projects(&self) -> Result<Vec<ProjectEntry>> {
let url = format!("{}/api/v1/projects", self.base_url);
let req = self.http.get(&url);
let req = self.authorized(req);
let response = req.send().await.map_err(|e| {
OrchestratorError::Io(std::io::Error::other(format!(
"Failed to connect to remote server '{}': {}",
self.base_url, e
)))
})?;
if !response.status().is_success() {
return Err(OrchestratorError::Io(std::io::Error::other(format!(
"Remote server returned status {}: {}",
response.status(),
response.status().canonical_reason().unwrap_or("Unknown")
))));
}
let projects: Vec<ProjectEntry> = response.json().await.map_err(|e| {
OrchestratorError::Io(std::io::Error::other(format!(
"Failed to parse remote server response: {}",
e
)))
})?;
Ok(projects)
}
pub async fn sync_project(&self, project_id: &str) -> Result<()> {
let url = format!("{}/api/v1/projects/{}/git/sync", self.base_url, project_id);
let req = self.http.post(&url);
let req = self.authorized(req);
let resp = req.send().await.map_err(|e| {
OrchestratorError::Io(std::io::Error::other(format!(
"Failed to sync project '{}': {}",
project_id, e
)))
})?;
if !resp.status().is_success() {
return Err(OrchestratorError::Io(std::io::Error::other(format!(
"Remote server returned status {} for project sync '{}'",
resp.status(),
project_id
))));
}
Ok(())
}
pub fn ws_url(&self) -> String {
let base = self.base_url.as_str();
if let Some(rest) = base.strip_prefix("https://") {
format!("wss://{}/api/v1/ws", rest)
} else if let Some(rest) = base.strip_prefix("http://") {
format!("ws://{}/api/v1/ws", rest)
} else {
format!("{}/api/v1/ws", base)
}
}
pub fn token(&self) -> Option<&str> {
self.token.as_deref()
}
pub async fn get_project(&self, project_id: &str) -> Result<serde_json::Value> {
let url = format!("{}/api/v1/projects/{}", self.base_url, project_id);
let req = self.http.get(&url);
let response = req.send().await.map_err(|e| {
OrchestratorError::Io(std::io::Error::other(format!(
"Failed to connect to server '{}': {}",
self.base_url, e
)))
})?;
Self::check_project_response(response).await
}
pub async fn list_projects_management(&self) -> Result<serde_json::Value> {
let url = format!("{}/api/v1/projects", self.base_url);
let req = self.http.get(&url);
let response = req.send().await.map_err(|e| {
OrchestratorError::Io(std::io::Error::other(format!(
"Failed to connect to server '{}': {}",
self.base_url, e
)))
})?;
Self::check_project_response(response).await
}
pub async fn add_project(&self, remote_url: &str, branch: &str) -> Result<serde_json::Value> {
let url = format!("{}/api/v1/projects", self.base_url);
let body = serde_json::json!({
"remote_url": remote_url,
"branch": branch,
});
let req = self.http.post(&url).json(&body);
let response = req.send().await.map_err(|e| {
OrchestratorError::Io(std::io::Error::other(format!(
"Failed to connect to server '{}': {}",
self.base_url, e
)))
})?;
Self::check_project_response(response).await
}
pub async fn delete_project(&self, project_id: &str) -> Result<serde_json::Value> {
let url = format!("{}/api/v1/projects/{}", self.base_url, project_id);
let req = self.http.delete(&url);
let response = req.send().await.map_err(|e| {
OrchestratorError::Io(std::io::Error::other(format!(
"Failed to connect to server '{}': {}",
self.base_url, e
)))
})?;
Self::check_project_response(response).await
}
pub async fn git_sync(&self, project_id: &str) -> Result<serde_json::Value> {
let url = format!("{}/api/v1/projects/{}/git/sync", self.base_url, project_id);
let req = self.http.post(&url);
let response = req.send().await.map_err(|e| {
OrchestratorError::Io(std::io::Error::other(format!(
"Failed to connect to server '{}': {}",
self.base_url, e
)))
})?;
Self::check_project_response(response).await
}
async fn check_project_response(response: reqwest::Response) -> Result<serde_json::Value> {
let status = response.status();
if status.is_success() {
let text = response.text().await.unwrap_or_default();
if text.is_empty() {
return Ok(serde_json::Value::Null);
}
serde_json::from_str(&text).map_err(|e| {
OrchestratorError::Io(std::io::Error::other(format!(
"Failed to parse server response: {}",
e
)))
})
} else {
let text = response.text().await.unwrap_or_default();
let detail = if let Ok(v) = serde_json::from_str::<serde_json::Value>(&text) {
v.get("error")
.or_else(|| v.get("message"))
.and_then(|m| m.as_str())
.map(|s| s.to_string())
.unwrap_or(text)
} else {
text
};
let label = match status.as_u16() {
401 => "Unauthorized (401)",
403 => "Forbidden (403)",
404 => "Not found (404)",
409 => "Conflict (409)",
422 => "Unprocessable entity (422)",
_ => status.canonical_reason().unwrap_or("Error"),
};
Err(OrchestratorError::Io(std::io::Error::other(format!(
"{}: {}",
label, detail
))))
}
}
pub async fn control_run(
&self,
_project_id: &str,
_changes: Option<Vec<String>>,
) -> Result<()> {
let url = format!("{}/api/v1/control/run", self.base_url);
let req = self.http.post(&url);
let req = self.authorized(req);
let resp = req.send().await.map_err(|e| {
OrchestratorError::Io(std::io::Error::other(format!(
"Failed to call remote run control: {}",
e
)))
})?;
if !resp.status().is_success() {
return Err(OrchestratorError::Io(std::io::Error::other(format!(
"Remote server returned status {} for run",
resp.status()
))));
}
Ok(())
}
#[allow(dead_code)]
pub async fn control_stop(&self, _project_id: &str) -> Result<()> {
let url = format!("{}/api/v1/control/stop", self.base_url);
let req = self.http.post(&url);
let req = self.authorized(req);
let resp = req.send().await.map_err(|e| {
OrchestratorError::Io(std::io::Error::other(format!(
"Failed to call remote stop control: {}",
e
)))
})?;
if !resp.status().is_success() {
return Err(OrchestratorError::Io(std::io::Error::other(format!(
"Remote server returned status {} for stop",
resp.status()
))));
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::super::test_helpers::{spawn_mock_http_server, spawn_mock_http_server_ordered};
use super::*;
#[test]
fn test_ws_url_http() {
let client = RemoteClient::new("http://localhost:39876", None);
assert_eq!(client.ws_url(), "ws://localhost:39876/api/v1/ws");
}
#[test]
fn test_ws_url_https() {
let client = RemoteClient::new("https://example.com", None);
assert_eq!(client.ws_url(), "wss://example.com/api/v1/ws");
}
#[test]
fn test_ws_url_no_scheme() {
let client = RemoteClient::new("localhost:39876", None);
assert_eq!(client.ws_url(), "localhost:39876/api/v1/ws");
}
#[test]
fn test_resolve_token_explicit() {
let token = RemoteClient::resolve_token(Some("mytoken".to_string()), Some("SOME_ENV_VAR"));
assert_eq!(token, Some("mytoken".to_string()));
}
#[test]
fn test_resolve_token_from_env() {
std::env::set_var("CFLX_TEST_TOKEN_12345", "env_token");
let token = RemoteClient::resolve_token(None, Some("CFLX_TEST_TOKEN_12345"));
assert_eq!(token, Some("env_token".to_string()));
std::env::remove_var("CFLX_TEST_TOKEN_12345");
}
#[test]
fn test_resolve_token_missing_env() {
std::env::remove_var("CFLX_NONEXISTENT_TOKEN_99999");
let token = RemoteClient::resolve_token(None, Some("CFLX_NONEXISTENT_TOKEN_99999"));
assert_eq!(token, None);
}
#[test]
fn test_resolve_token_none() {
let token = RemoteClient::resolve_token(None, None);
assert_eq!(token, None);
}
#[test]
fn test_client_with_token_stores_token() {
let client = RemoteClient::new("http://localhost:39876", Some("secret123".to_string()));
assert_eq!(client.token(), Some("secret123"));
}
#[test]
fn test_client_without_token() {
let client = RemoteClient::new("http://localhost:39876", None);
assert_eq!(client.token(), None);
}
#[tokio::test]
async fn test_list_all_projects_calls_correct_endpoint() {
let project_json = r#"[{"id":"proj-1","remote_url":"https://github.com/a/b","branch":"main","status":"idle","created_at":"2024-01-01T00:00:00Z"}]"#;
let responses = vec![(200, project_json.to_string()), (200, "{}".to_string())];
let (addr, mut path_rx) = spawn_mock_http_server_ordered(responses).await;
let client = RemoteClient::new(format!("http://{}", addr), None);
let projects = client
.list_all_projects()
.await
.expect("list_all_projects should succeed");
assert_eq!(projects.len(), 1);
assert_eq!(projects[0].id, "proj-1");
let method_path = tokio::time::timeout(tokio::time::Duration::from_secs(3), path_rx.recv())
.await
.expect("Timed out")
.expect("No request captured");
assert_eq!(method_path, "GET /api/v1/projects");
}
#[tokio::test]
async fn test_sync_project_calls_correct_endpoint() {
let responses = vec![(200, "{}".to_string())];
let (addr, mut path_rx) = spawn_mock_http_server_ordered(responses).await;
let client = RemoteClient::new(format!("http://{}", addr), None);
client
.sync_project("proj-abc")
.await
.expect("sync_project should succeed");
let method_path = tokio::time::timeout(tokio::time::Duration::from_secs(3), path_rx.recv())
.await
.expect("Timed out")
.expect("No request captured");
assert_eq!(method_path, "POST /api/v1/projects/proj-abc/git/sync");
}
#[tokio::test]
async fn test_list_then_sync_ordering() {
let project_json = r#"[{"id":"proj-1","remote_url":"https://github.com/a/b","branch":"main","status":"idle","created_at":"2024-01-01T00:00:00Z"},{"id":"proj-2","remote_url":"https://github.com/c/d","branch":"dev","status":"idle","created_at":"2024-01-01T00:00:00Z"}]"#;
let responses = vec![
(200, project_json.to_string()), (200, "{}".to_string()), (200, "{}".to_string()), ];
let (addr, mut path_rx) = spawn_mock_http_server_ordered(responses).await;
let client = RemoteClient::new(format!("http://{}", addr), None);
let projects = client
.list_all_projects()
.await
.expect("list should succeed");
assert_eq!(projects.len(), 2);
for project in &projects {
client
.sync_project(&project.id)
.await
.expect("sync should succeed");
}
let first = tokio::time::timeout(tokio::time::Duration::from_secs(3), path_rx.recv())
.await
.expect("timeout")
.expect("no msg");
let second = tokio::time::timeout(tokio::time::Duration::from_secs(3), path_rx.recv())
.await
.expect("timeout")
.expect("no msg");
let third = tokio::time::timeout(tokio::time::Duration::from_secs(3), path_rx.recv())
.await
.expect("timeout")
.expect("no msg");
assert_eq!(first, "GET /api/v1/projects");
assert_eq!(second, "POST /api/v1/projects/proj-1/git/sync");
assert_eq!(third, "POST /api/v1/projects/proj-2/git/sync");
}
#[tokio::test]
async fn test_sync_project_error_on_non_200() {
let responses = vec![(500, r#"{"error":"internal error"}"#.to_string())];
let (addr, _) = spawn_mock_http_server_ordered(responses).await;
let client = RemoteClient::new(format!("http://{}", addr), None);
let result = client.sync_project("failing-project").await;
assert!(
result.is_err(),
"sync_project should return Err on 500 response"
);
}
#[tokio::test]
async fn test_authorization_header_sent_with_token() {
let (addr, req_rx) = spawn_mock_http_server().await;
let client = RemoteClient::new(
format!("http://{}", addr),
Some("my-secret-token".to_string()),
);
let _ = client.list_projects().await;
let raw_request = tokio::time::timeout(tokio::time::Duration::from_secs(3), req_rx)
.await
.expect("Timed out waiting for request")
.expect("Server did not receive request");
let raw_lower = raw_request.to_lowercase();
assert!(
raw_lower.contains("authorization: bearer my-secret-token"),
"Expected 'authorization: bearer my-secret-token' in request headers, got:\n{}",
raw_request
);
}
#[tokio::test]
async fn test_no_authorization_header_without_token() {
let (addr, req_rx) = spawn_mock_http_server().await;
let client = RemoteClient::new(format!("http://{}", addr), None);
let _ = client.list_projects().await;
let raw_request = tokio::time::timeout(tokio::time::Duration::from_secs(3), req_rx)
.await
.expect("Timed out waiting for request")
.expect("Server did not receive request");
let raw_lower = raw_request.to_lowercase();
assert!(
!raw_lower.contains("authorization:"),
"Did not expect 'Authorization' header when no token is set, got:\n{}",
raw_request
);
}
#[test]
fn test_parse_url_tree_branch() {
let (url, branch) = parse_project_url("https://github.com/org/repo/tree/develop");
assert_eq!(url, "https://github.com/org/repo");
assert_eq!(branch, Some("develop".to_string()));
}
#[test]
fn test_parse_url_tree_branch_main() {
let (url, branch) = parse_project_url("https://github.com/org/repo/tree/main");
assert_eq!(url, "https://github.com/org/repo");
assert_eq!(branch, Some("main".to_string()));
}
#[test]
fn test_parse_url_fragment_branch() {
let (url, branch) = parse_project_url("https://github.com/org/repo#develop");
assert_eq!(url, "https://github.com/org/repo");
assert_eq!(branch, Some("develop".to_string()));
}
#[test]
fn test_parse_url_no_branch() {
let (url, branch) = parse_project_url("https://github.com/org/repo");
assert_eq!(url, "https://github.com/org/repo");
assert_eq!(branch, None);
}
#[test]
fn test_parse_url_trailing_slash_stripped() {
let (url, branch) = parse_project_url("https://github.com/org/repo/");
assert_eq!(url, "https://github.com/org/repo");
assert_eq!(branch, None);
}
#[tokio::test]
async fn test_default_branch_used_when_omitted() {
let (url, branch) =
resolve_project_url_and_branch("https://github.com/org/repo", None, |_url| async {
Ok("main".to_string())
})
.await
.expect("should succeed");
assert_eq!(url, "https://github.com/org/repo");
assert_eq!(branch, "main");
}
#[tokio::test]
async fn test_url_branch_used_when_no_explicit_branch() {
let (url, branch) = resolve_project_url_and_branch(
"https://github.com/org/repo/tree/develop",
None,
|_url| async {
panic!("resolver should not be called when URL has a branch");
#[allow(unreachable_code)]
Ok(String::new())
},
)
.await
.expect("should succeed");
assert_eq!(url, "https://github.com/org/repo");
assert_eq!(branch, "develop");
}
#[tokio::test]
async fn test_explicit_branch_overrides_url_branch() {
let (url, branch) = resolve_project_url_and_branch(
"https://github.com/org/repo/tree/develop",
Some("main"),
|_url| async {
panic!("resolver should not be called when explicit branch is given");
#[allow(unreachable_code)]
Ok(String::new())
},
)
.await
.expect("should succeed");
assert_eq!(url, "https://github.com/org/repo");
assert_eq!(branch, "main");
}
#[tokio::test]
async fn test_explicit_branch_overrides_fragment_branch() {
let (url, branch) = resolve_project_url_and_branch(
"https://github.com/org/repo#develop",
Some("main"),
|_url| async {
panic!("resolver should not be called when explicit branch is given");
#[allow(unreachable_code)]
Ok(String::new())
},
)
.await
.expect("should succeed");
assert_eq!(url, "https://github.com/org/repo");
assert_eq!(branch, "main");
}
#[tokio::test]
async fn test_get_project_no_auth_header() {
use super::super::test_helpers::spawn_flexible_mock_http_server;
let response_json = r#"{"id":"proj-abc123","remote_url":"https://example.com/repo.git","branch":"main","status":"idle","created_at":"2024-01-01T00:00:00Z"}"#;
let (addr, req_rx) = spawn_flexible_mock_http_server(response_json.to_string()).await;
let client = RemoteClient::new(format!("http://{}", addr), None);
let _ = client.get_project("proj-abc123").await;
let captured = tokio::time::timeout(tokio::time::Duration::from_secs(3), req_rx)
.await
.expect("Timed out")
.expect("Server did not receive request");
assert_eq!(captured.method, "GET");
assert_eq!(captured.path, "/api/v1/projects/proj-abc123");
assert!(
!captured.raw.contains("authorization:"),
"get_project must not send Authorization header; got:\n{}",
captured.raw
);
}
#[tokio::test]
async fn test_list_projects_management_no_auth_header() {
use super::super::test_helpers::spawn_flexible_mock_http_server;
let (addr, req_rx) = spawn_flexible_mock_http_server("[]".to_string()).await;
let client = RemoteClient::new(format!("http://{}", addr), None);
let _ = client.list_projects_management().await;
let captured = tokio::time::timeout(tokio::time::Duration::from_secs(3), req_rx)
.await
.expect("Timed out")
.expect("Server did not receive request");
assert_eq!(captured.method, "GET");
assert_eq!(captured.path, "/api/v1/projects");
assert!(
!captured.raw.contains("authorization:"),
"list_projects_management must not send Authorization header; got:\n{}",
captured.raw
);
}
#[tokio::test]
async fn test_add_project_no_auth_header() {
use super::super::test_helpers::spawn_flexible_mock_http_server;
let response_json = r#"{"id":"proj-1","remote_url":"https://example.com/repo.git","branch":"main","status":"idle","created_at":"2024-01-01T00:00:00Z"}"#;
let (addr, req_rx) = spawn_flexible_mock_http_server(response_json.to_string()).await;
let client = RemoteClient::new(format!("http://{}", addr), None);
let _ = client
.add_project("https://example.com/repo.git", "main")
.await;
let captured = tokio::time::timeout(tokio::time::Duration::from_secs(3), req_rx)
.await
.expect("Timed out")
.expect("Server did not receive request");
assert_eq!(captured.method, "POST");
assert_eq!(captured.path, "/api/v1/projects");
assert!(
!captured.raw.contains("authorization:"),
"add_project must not send Authorization header; got:\n{}",
captured.raw
);
assert!(
captured.body.contains("remote_url"),
"Request body should contain remote_url; got: {}",
captured.body
);
assert!(
captured.body.contains("branch"),
"Request body should contain branch; got: {}",
captured.body
);
}
#[tokio::test]
async fn test_delete_project_no_auth_header() {
use super::super::test_helpers::spawn_flexible_mock_http_server;
let (addr, req_rx) = spawn_flexible_mock_http_server("{}".to_string()).await;
let client = RemoteClient::new(format!("http://{}", addr), None);
let _ = client.delete_project("proj-abc123").await;
let captured = tokio::time::timeout(tokio::time::Duration::from_secs(3), req_rx)
.await
.expect("Timed out")
.expect("Server did not receive request");
assert_eq!(captured.method, "DELETE");
assert_eq!(captured.path, "/api/v1/projects/proj-abc123");
assert!(
!captured.raw.contains("authorization:"),
"delete_project must not send Authorization header; got:\n{}",
captured.raw
);
}
#[tokio::test]
async fn test_git_sync_no_auth_header() {
use super::super::test_helpers::spawn_flexible_mock_http_server;
let (addr, req_rx) = spawn_flexible_mock_http_server("{}".to_string()).await;
let client = RemoteClient::new(format!("http://{}", addr), None);
let _ = client.git_sync("proj-abc123").await;
let captured = tokio::time::timeout(tokio::time::Duration::from_secs(3), req_rx)
.await
.expect("Timed out")
.expect("Server did not receive request");
assert_eq!(captured.method, "POST");
assert_eq!(captured.path, "/api/v1/projects/proj-abc123/git/sync");
assert!(
!captured.raw.contains("authorization:"),
"git_sync must not send Authorization header; got:\n{}",
captured.raw
);
}
}