use crate::health::{BackpressureChecker, BackpressureMetrics, QueueDepthStatus};
use crate::orchestration::core::OrchestrationCore;
use crate::services::{
AnalyticsService, HealthService, SharedApiServices, StepService, TaskService,
TemplateQueryService,
};
use crate::web::circuit_breaker::WebDatabaseCircuitBreaker;
use sqlx::PgPool;
use std::sync::Arc;
use tasker_shared::config::web::WebConfig;
use tasker_shared::types::SecurityService;
use tasker_shared::types::web::{ApiError, DbOperationType, SystemOperationalState};
use tasker_shared::TaskerResult;
use tokio::sync::RwLock;
use tracing::info;
pub use crate::api_common::operational_status::{DatabasePoolUsageStats, OrchestrationStatus};
#[derive(Clone, Debug)]
pub struct AppState {
pub config: Arc<WebConfig>,
pub auth_config: Option<Arc<tasker_shared::config::WebAuthConfig>>,
pub services: Arc<SharedApiServices>,
}
impl AppState {
pub fn new(services: Arc<SharedApiServices>, web_config: WebConfig) -> Self {
let auth_config = web_config
.auth
.as_ref()
.map(|auth| Arc::new(tasker_shared::config::WebAuthConfig::from(auth.clone())));
info!(
auth_enabled = services.is_auth_enabled(),
"Web API application state created from shared services"
);
Self {
config: Arc::new(web_config),
auth_config,
services,
}
}
pub fn security_service(&self) -> Option<&Arc<SecurityService>> {
self.services.security_service.as_ref()
}
pub fn web_db_pool(&self) -> &PgPool {
&self.services.write_pool
}
pub fn orchestration_db_pool(&self) -> &PgPool {
&self.services.read_pool
}
pub fn web_db_circuit_breaker(&self) -> &WebDatabaseCircuitBreaker {
&self.services.circuit_breaker
}
pub fn orchestration_core(&self) -> &Arc<OrchestrationCore> {
&self.services.orchestration_core
}
pub fn analytics_service(&self) -> &Arc<AnalyticsService> {
&self.services.analytics_service
}
pub fn task_service(&self) -> &TaskService {
&self.services.task_service
}
pub fn step_service(&self) -> &StepService {
&self.services.step_service
}
pub fn health_service(&self) -> &HealthService {
&self.services.health_service
}
pub fn template_query_service(&self) -> &TemplateQueryService {
&self.services.template_query_service
}
pub fn orchestration_status(&self) -> &Arc<RwLock<OrchestrationStatus>> {
&self.services.orchestration_status
}
pub fn select_db_pool(&self, operation_type: DbOperationType) -> &PgPool {
match operation_type {
DbOperationType::WebWrite | DbOperationType::WebCritical => &self.services.write_pool,
DbOperationType::ReadOnly | DbOperationType::Analytics => &self.services.read_pool,
}
}
pub fn is_database_healthy(&self) -> bool {
!self.services.circuit_breaker.is_circuit_open()
}
pub fn record_database_success(&self) {
self.services.circuit_breaker.record_success();
}
pub fn record_database_failure(&self) {
self.services.circuit_breaker.record_failure();
}
pub async fn update_orchestration_status(&self, new_status: OrchestrationStatus) {
let mut status = self.services.orchestration_status.write().await;
*status = new_status;
}
pub async fn operational_state(&self) -> SystemOperationalState {
self.services
.orchestration_status
.read()
.await
.operational_state
.clone()
}
pub async fn report_pool_usage_stats(&self) -> TaskerResult<DatabasePoolUsageStats> {
Ok(self.get_pool_usage_stats())
}
pub fn get_pool_usage_stats(&self) -> DatabasePoolUsageStats {
let web_pool = &self.services.write_pool;
let current_size = web_pool.size();
let max_size = self.config.database_pools.web_api_max_connections;
let usage_ratio = if max_size > 0 {
current_size as f64 / max_size as f64
} else {
0.0
};
DatabasePoolUsageStats {
pool_name: "web_api_pool".to_string(),
active_connections: current_size,
max_connections: max_size,
usage_ratio,
is_healthy: usage_ratio <= 0.75, }
}
pub fn get_command_channel_saturation(&self) -> f64 {
let channel_status = self
.orchestration_core()
.backpressure_checker()
.try_get_channel_status();
if !channel_status.evaluated {
return 0.0;
}
channel_status.command_saturation_percent
}
pub fn is_command_channel_saturated(&self) -> bool {
let channel_status = self
.orchestration_core()
.backpressure_checker()
.try_get_channel_status();
channel_status.evaluated && channel_status.is_saturated
}
pub fn is_command_channel_critical(&self) -> bool {
let channel_status = self
.orchestration_core()
.backpressure_checker()
.try_get_channel_status();
channel_status.evaluated && channel_status.is_critical
}
pub fn check_backpressure_status(&self) -> Option<ApiError> {
self.services
.orchestration_core
.backpressure_checker()
.try_check_backpressure()
}
pub fn try_get_queue_depth_status(&self) -> QueueDepthStatus {
self.services
.orchestration_core
.backpressure_checker()
.try_get_queue_depth_status()
}
pub fn backpressure_checker(&self) -> &BackpressureChecker {
self.services.orchestration_core.backpressure_checker()
}
pub async fn get_backpressure_metrics(&self) -> BackpressureMetrics {
self.services
.orchestration_core
.backpressure_checker()
.get_backpressure_metrics()
.await
}
pub async fn check_queue_depth_status(&self) -> QueueDepthStatus {
self.try_get_queue_depth_status()
}
}