use std::collections::HashMap;
use std::sync::Arc;
use serde_json::Value;
use tokio::sync::mpsc::UnboundedSender;
use tokio::task::{JoinHandle, JoinSet};
use crate::api::client::ApiClient;
use crate::cli::endpoint::resolve_control_plane_url;
use crate::config::credentials::resolve_api_key;
use crate::config::manager::ConfigManager;
use crate::model::loader::Models;
use crate::model::types::Operation;
use crate::service::executor::OperationExecutor;
use super::app::{BillingInfo, ClusterCounts, HomeMsg, UsageInfo};
const IMPORT_CONCURRENCY: usize = 8;
pub fn spawn(
models: Models,
config_mgr: ConfigManager,
tx: UnboundedSender<HomeMsg>,
) -> JoinHandle<()> {
tokio::spawn(async move { run(models, config_mgr, tx).await })
}
async fn run(models: Models, config_mgr: ConfigManager, tx: UnboundedSender<HomeMsg>) {
let api_key = match resolve_api_key(None, &config_mgr) {
Some(k) => k,
None => {
let msg = "no API key available".to_string();
let _ = tx.send(HomeMsg::ProjectsLoaded(Err(msg.clone())));
let _ = tx.send(HomeMsg::ImportJobsLoaded(Err(msg.clone())));
let _ = tx.send(HomeMsg::ClustersLoaded(Err(msg.clone())));
let _ = tx.send(HomeMsg::BillingLoaded(Err(msg.clone())));
let _ = tx.send(HomeMsg::UsageLoaded(Err(msg.clone())));
let _ = tx.send(HomeMsg::VolumesLoaded(Err(msg.clone())));
let _ = tx.send(HomeMsg::BackupsLoaded(Err(msg)));
return;
}
};
let base_url = resolve_control_plane_url(&config_mgr, &models.control_plane, None);
let client = Arc::new(ApiClient::new(api_key, base_url).quiet());
let projects_and_volumes = fetch_projects_and_volumes(client.clone(), &models, tx.clone());
let clusters_and_imports = fetch_clusters_and_imports(client.clone(), &models, tx.clone());
let billing = fetch_billing(client.clone(), tx.clone());
let usage = fetch_usage(client.clone(), tx.clone());
let backups = fetch_backups(client.clone(), &models, tx.clone());
tokio::join!(
projects_and_volumes,
clusters_and_imports,
billing,
usage,
backups
);
}
async fn fetch_projects_and_volumes(
client: Arc<ApiClient>,
models: &Models,
tx: UnboundedSender<HomeMsg>,
) {
let project_op = match models
.control_plane
.resources
.get("project")
.and_then(|r| r.operations.get("list"))
{
Some(op) => op.clone(),
None => {
let msg = "project.list operation missing from model".to_string();
let _ = tx.send(HomeMsg::ProjectsLoaded(Err(msg.clone())));
let _ = tx.send(HomeMsg::VolumesLoaded(Err(msg)));
return;
}
};
let executor = OperationExecutor::new(&client);
let projects = match executor.execute(&project_op, &HashMap::new()).await {
Ok(value) => value,
Err(e) => {
let err = e.to_string();
tracing::warn!("home_counts: project.list failed: {}", err);
let _ = tx.send(HomeMsg::ProjectsLoaded(Err(err.clone())));
let _ = tx.send(HomeMsg::VolumesLoaded(Err(err)));
return;
}
};
let count = extract_count(&projects);
let _ = tx.send(HomeMsg::ProjectsLoaded(Ok(count)));
let project_ids: Vec<String> = projects
.as_array()
.map(|arr| {
arr.iter()
.filter_map(|p| {
p.get("projectId")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
})
.collect()
})
.unwrap_or_default();
let volume_op = match models
.control_plane
.resources
.get("volume")
.and_then(|r| r.operations.get("list"))
{
Some(op) => op.clone(),
None => {
let _ = tx.send(HomeMsg::VolumesLoaded(Err(
"volume.list operation missing from model".to_string(),
)));
return;
}
};
if project_ids.is_empty() {
let _ = tx.send(HomeMsg::VolumesLoaded(Ok(0)));
return;
}
let mut total: u64 = 0;
let mut any_ok = false;
for pid in &project_ids {
let mut params = HashMap::new();
params.insert("projectId".to_string(), Value::String(pid.clone()));
match executor.execute(&volume_op, ¶ms).await {
Ok(value) => {
total = total.saturating_add(extract_count(&value));
any_ok = true;
}
Err(e) => {
tracing::warn!("home_counts: volume.list for project {} failed: {}", pid, e);
}
}
}
if any_ok {
let _ = tx.send(HomeMsg::VolumesLoaded(Ok(total)));
} else {
let _ = tx.send(HomeMsg::VolumesLoaded(Err(
"all volume.list calls failed".to_string()
)));
}
}
async fn fetch_clusters_and_imports(
client: Arc<ApiClient>,
models: &Models,
tx: UnboundedSender<HomeMsg>,
) {
let cluster_list_op = match models
.control_plane
.resources
.get("cluster")
.and_then(|r| r.operations.get("list"))
{
Some(op) => op.clone(),
None => {
let msg = "cluster.list operation missing from model".to_string();
let _ = tx.send(HomeMsg::ImportJobsLoaded(Err(msg.clone())));
let _ = tx.send(HomeMsg::ClustersLoaded(Err(msg)));
return;
}
};
let executor = OperationExecutor::new(&client);
let clusters = match executor
.execute_all_pages(&cluster_list_op, &HashMap::new())
.await
{
Ok(items) => items,
Err(e) => {
let err = e.to_string();
tracing::warn!("home_counts: cluster.list failed: {}", err);
let _ = tx.send(HomeMsg::ImportJobsLoaded(Err(err.clone())));
let _ = tx.send(HomeMsg::ClustersLoaded(Err(err)));
return;
}
};
let _ = tx.send(HomeMsg::ClustersLoaded(Ok(count_cluster_statuses(
&clusters,
))));
let import_list_op = match models
.control_plane
.resources
.get("import")
.and_then(|r| r.operations.get("list"))
{
Some(op) => op.clone(),
None => {
let _ = tx.send(HomeMsg::ImportJobsLoaded(Err(
"import.list operation missing from model".to_string(),
)));
return;
}
};
let cluster_ids: Vec<String> = clusters
.iter()
.filter_map(|v| {
v.get("clusterId")
.and_then(|s| s.as_str())
.map(|s| s.to_string())
})
.collect();
if cluster_ids.is_empty() {
let _ = tx.send(HomeMsg::ImportJobsLoaded(Ok(0)));
return;
}
let total = sum_import_counts(client, import_list_op, cluster_ids).await;
let _ = tx.send(match total {
Ok(n) => HomeMsg::ImportJobsLoaded(Ok(n)),
Err(e) => HomeMsg::ImportJobsLoaded(Err(e)),
});
}
pub(crate) fn count_cluster_statuses(clusters: &[Value]) -> ClusterCounts {
let mut running: u64 = 0;
let mut suspended: u64 = 0;
let mut abnormal: u64 = 0;
let total = clusters.len() as u64;
for cluster in clusters {
match cluster.get("status").and_then(|s| s.as_str()).unwrap_or("") {
"RUNNING" => running += 1,
"SUSPENDED" => suspended += 1,
"CREATING" | "SUSPENDING" => {}
_ => abnormal += 1,
}
}
ClusterCounts {
total,
running,
suspended,
abnormal,
}
}
async fn fetch_billing(client: Arc<ApiClient>, tx: UnboundedSender<HomeMsg>) {
use chrono::{Datelike, Utc};
let now = Utc::now();
let this_month_start = format!("{}-{:02}-01T00:00:00Z", now.year(), now.month());
let this_month_end = now.format("%Y-%m-%dT23:59:59Z").to_string();
let (prev_year, prev_month) = if now.month() == 1 {
(now.year() - 1, 12u32)
} else {
(now.year(), now.month() - 1)
};
let last_month_start = format!("{}-{:02}-01T00:00:00Z", prev_year, prev_month);
let last_month_end = format!("{}-{:02}-01T00:00:00Z", now.year(), now.month());
let (cur_r, prev_r) = tokio::join!(
query_usage(&client, &this_month_start, &this_month_end),
query_usage(&client, &last_month_start, &last_month_end),
);
match (cur_r, prev_r) {
(Ok(balance), Ok(last_month)) => {
let settled =
last_month <= 0.0 || check_last_month_settled(&client, prev_year, prev_month).await;
let _ = tx.send(HomeMsg::BillingLoaded(Ok(BillingInfo {
balance,
last_month,
last_month_settled: settled,
})));
}
(Err(e), _) | (_, Err(e)) => {
tracing::warn!("home_counts: usage/query (billing) failed: {}", e);
let _ = tx.send(HomeMsg::BillingLoaded(Err("unavailable".into())));
}
}
}
async fn check_last_month_settled(client: &ApiClient, year: i32, month: u32) -> bool {
let body = serde_json::json!({
"year": year,
"month": month,
});
match client
.call("POST", "/v2/invoices/query", None, Some(&body))
.await
{
Ok(v) => v
.as_array()
.and_then(|arr| arr.first())
.and_then(|inv| inv.get("status").and_then(|s| s.as_str()))
.map(|s| s == "paid" || s == "settled")
.unwrap_or(true),
Err(e) => {
tracing::warn!("home_counts: invoices/query failed: {}", e);
true
}
}
}
async fn fetch_usage(client: Arc<ApiClient>, tx: UnboundedSender<HomeMsg>) {
use chrono::{Datelike, Utc};
let now = Utc::now();
let end = now.format("%Y-%m-%dT23:59:59Z").to_string();
let today_start = now.format("%Y-%m-%dT00:00:00Z").to_string();
let days_since_monday = now.weekday().num_days_from_monday();
let monday = now - chrono::Duration::days(days_since_monday as i64);
let week_start = monday.format("%Y-%m-%dT00:00:00Z").to_string();
let month_start = format!("{}-{:02}-01T00:00:00Z", now.year(), now.month());
let (today_r, week_r, month_r) = tokio::join!(
query_usage(&client, &today_start, &end),
query_usage(&client, &week_start, &end),
query_usage(&client, &month_start, &end),
);
match (today_r, week_r, month_r) {
(Ok(today), Ok(week), Ok(month)) => {
let _ = tx.send(HomeMsg::UsageLoaded(Ok(UsageInfo { today, week, month })));
}
(Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)) => {
tracing::warn!("home_counts: usage/query failed: {}", e);
let _ = tx.send(HomeMsg::UsageLoaded(Err(e)));
}
}
}
async fn query_usage(client: &ApiClient, start: &str, end: &str) -> Result<f64, String> {
let body = serde_json::json!({ "start": start, "end": end });
client
.call("POST", "/v2/usage/query", None, Some(&body))
.await
.map(|v| v.get("totalAmount").and_then(|a| a.as_f64()).unwrap_or(0.0))
.map_err(|e| e.to_string())
}
async fn fetch_backups(client: Arc<ApiClient>, models: &Models, tx: UnboundedSender<HomeMsg>) {
let op = match models
.control_plane
.resources
.get("backup")
.and_then(|r| r.operations.get("list"))
{
Some(op) => op.clone(),
None => {
let _ = tx.send(HomeMsg::BackupsLoaded(Err(
"backup.list operation missing from model".to_string(),
)));
return;
}
};
let executor = OperationExecutor::new(&client);
match executor.execute(&op, &HashMap::new()).await {
Ok(value) => {
let count = extract_count(&value);
let _ = tx.send(HomeMsg::BackupsLoaded(Ok(count)));
}
Err(e) => {
let err = e.to_string();
tracing::warn!("home_counts: backup.list failed: {}", err);
let _ = tx.send(HomeMsg::BackupsLoaded(Err(err)));
}
}
}
async fn sum_import_counts(
client: Arc<ApiClient>,
import_list_op: Operation,
cluster_ids: Vec<String>,
) -> Result<u64, String> {
let total_clusters = cluster_ids.len();
let import_list_op = Arc::new(import_list_op);
let mut iter = cluster_ids.into_iter();
let mut set: JoinSet<(String, Result<u64, String>)> = JoinSet::new();
let mut in_flight: usize = 0;
let mut total: u64 = 0;
let mut failures: usize = 0;
loop {
while in_flight < IMPORT_CONCURRENCY {
match iter.next() {
Some(cid) => {
let client_c = client.clone();
let op_c = import_list_op.clone();
let cid_c = cid.clone();
set.spawn(async move {
let r = fetch_import_count_for_cluster(&client_c, &op_c, &cid_c).await;
(cid_c, r)
});
in_flight += 1;
}
None => break,
}
}
if in_flight == 0 {
break;
}
if let Some(joined) = set.join_next().await {
in_flight -= 1;
match joined {
Ok((_cid, Ok(n))) => {
total = total.saturating_add(n);
}
Ok((cid, Err(e))) => {
tracing::warn!("home_counts: import.list for cluster {} failed: {}", cid, e);
failures += 1;
}
Err(join_err) => {
tracing::warn!("home_counts: import.list task join error: {}", join_err);
failures += 1;
}
}
}
}
if failures == total_clusters {
return Err(format!("all {} import.list calls failed", total_clusters));
}
Ok(total)
}
async fn fetch_import_count_for_cluster(
client: &ApiClient,
import_list_op: &Operation,
cluster_id: &str,
) -> Result<u64, String> {
let mut params = HashMap::new();
params.insert(
"clusterId".to_string(),
Value::String(cluster_id.to_string()),
);
let executor = OperationExecutor::new(client);
executor
.execute(import_list_op, ¶ms)
.await
.map(|v| extract_count(&v))
.map_err(|e| e.to_string())
}
fn extract_count(value: &Value) -> u64 {
if let Some(arr) = value.as_array() {
return arr.len() as u64;
}
if let Some(obj) = value.as_object() {
if let Some(n) = obj.get("count").and_then(|v| v.as_u64()) {
return n;
}
if let Some(arr) = obj.get("records").and_then(|v| v.as_array()) {
return arr.len() as u64;
}
}
0
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn extract_count_from_array() {
let v = json!([{"a": 1}, {"a": 2}, {"a": 3}]);
assert_eq!(extract_count(&v), 3);
}
#[test]
fn extract_count_from_count_field() {
let v = json!({"count": 42, "records": []});
assert_eq!(extract_count(&v), 42);
}
#[test]
fn extract_count_falls_back_to_records_len() {
let v = json!({"records": [{}, {}, {}, {}]});
assert_eq!(extract_count(&v), 4);
}
#[test]
fn extract_count_unknown_shape_is_zero() {
let v = json!({"foo": "bar"});
assert_eq!(extract_count(&v), 0);
}
#[test]
fn count_cluster_statuses_mixed() {
let clusters = vec![
json!({"status": "RUNNING"}),
json!({"status": "RUNNING"}),
json!({"status": "SUSPENDED"}),
json!({"status": "CREATING"}),
json!({"status": "SUSPENDING"}),
json!({"status": "STOPPED"}),
];
let c = count_cluster_statuses(&clusters);
assert_eq!(c.total, 6);
assert_eq!(c.running, 2);
assert_eq!(c.suspended, 1);
assert_eq!(c.abnormal, 1);
}
#[test]
fn count_cluster_statuses_empty() {
let c = count_cluster_statuses(&[]);
assert_eq!(
c,
ClusterCounts {
total: 0,
running: 0,
suspended: 0,
abnormal: 0,
}
);
}
#[test]
fn count_cluster_statuses_missing_status_is_abnormal() {
let clusters = vec![json!({"name": "no-status-field"})];
let c = count_cluster_statuses(&clusters);
assert_eq!(c.total, 1);
assert_eq!(c.abnormal, 1);
}
#[test]
fn count_cluster_statuses_all_transient() {
let clusters = vec![
json!({"status": "CREATING"}),
json!({"status": "SUSPENDING"}),
];
let c = count_cluster_statuses(&clusters);
assert_eq!(c.total, 2);
assert_eq!(c.running, 0);
assert_eq!(c.suspended, 0);
assert_eq!(c.abnormal, 0);
}
}