use serde_json::{Value, json};
use crate::db::DbDriver;
use crate::db::Pool;
use crate::errors::app_error::AppError;
pub struct StatsService {
pool: Pool,
}
impl StatsService {
pub fn new(pool: Pool) -> Self {
Self { pool }
}
pub async fn overview(&self, tenant_id: Option<&str>) -> Result<Value, AppError> {
let tf = crate::db::tenant::tenant_filter_ph(tenant_id, 1);
let tf_aliased = crate::db::tenant::tenant_filter_aliased_ph("p", tenant_id, 1);
let total_posts = count_table(&self.pool, "posts", &tf_aliased, tenant_id).await?;
let total_comments = count_table(&self.pool, "comments", &tf_aliased, tenant_id).await?;
let total_users = count_table(&self.pool, "users", &tf, tenant_id).await?;
let total_media = count_table(&self.pool, "media", &tf, tenant_id).await?;
let total_categories =
count_table(&self.pool, "categories", &tf_aliased, tenant_id).await?;
let total_tags = count_table(&self.pool, "tags", &tf_aliased, tenant_id).await?;
let content_by_type = self.count_content_types(tenant_id).await?;
let posts_by_status = self.count_by_status("posts", tenant_id).await?;
let comments_by_status = self.count_by_status("comments", tenant_id).await?;
let recent_activity = self.recent_activity(tenant_id, 10).await?;
Ok(json!({
"total_posts": total_posts,
"total_comments": total_comments,
"total_users": total_users,
"total_media": total_media,
"total_categories": total_categories,
"total_tags": total_tags,
"posts_by_status": posts_by_status,
"comments_by_status": comments_by_status,
"content_by_type": content_by_type,
"recent_activity": recent_activity,
}))
}
pub async fn content_stats(
&self,
table: &str,
tenant_id: Option<&str>,
) -> Result<Value, AppError> {
validate_table_name(table)?;
let tf = crate::db::tenant::tenant_filter_ph(tenant_id, 1);
let has_status = has_column(&self.pool, table, "status").await;
let has_tenant = crate::db::tenant::has_tenant_id(&self.pool, table).await;
let total = count_table(&self.pool, table, &tf, tenant_id).await?;
let mut result = json!({
"table": table,
"total": total,
});
if has_status {
let status_sql = if has_tenant {
let tid = crate::db::tenant::resolve_tenant(tenant_id).to_string();
let sql = format!(
"SELECT status, COUNT(*) as cnt FROM {table} WHERE tenant_id = {} GROUP BY status",
crate::db::Driver::ph(1)
);
let rows: Vec<(String, i64)> = sqlx::query_as::<_, (String, i64)>(&sql)
.bind(&tid)
.fetch_all(&self.pool)
.await
.map_err(|e| AppError::Internal(e.into()))?;
rows
} else {
let sql = format!("SELECT status, COUNT(*) as cnt FROM {table} GROUP BY status");
let rows: Vec<(String, i64)> = sqlx::query_as::<_, (String, i64)>(&sql)
.fetch_all(&self.pool)
.await
.map_err(|e| AppError::Internal(e.into()))?;
rows
};
let mut by_status = serde_json::Map::new();
for (status, count) in status_sql {
by_status.insert(status, json!(count));
}
if let Some(obj) = result.as_object_mut() {
obj.insert("by_status".into(), json!(by_status));
}
}
Ok(result)
}
pub async fn trends(
&self,
table: &str,
days: i64,
tenant_id: Option<&str>,
) -> Result<Value, AppError> {
validate_table_name(table)?;
let days = days.clamp(1, 365);
let has_ts = has_column(&self.pool, table, "created_at").await;
let has_tenant = crate::db::tenant::has_tenant_id(&self.pool, table).await;
if !has_ts {
return Ok(json!({
"table": table,
"days": days,
"data": [],
}));
}
let date_expr = date_trunc_day_expr("created_at");
let ago = crate::db::Driver::ago_expr(days);
let sql = if has_tenant {
format!(
"SELECT {date_expr} as d, COUNT(*) as cnt FROM {table} \
WHERE tenant_id = {} AND created_at >= {ago} \
GROUP BY d ORDER BY d",
crate::db::Driver::ph(1)
)
} else {
format!(
"SELECT {date_expr} as d, COUNT(*) as cnt FROM {table} \
WHERE created_at >= {ago} \
GROUP BY d ORDER BY d"
)
};
let mut q = sqlx::query_as::<_, (String, i64)>(&sql);
if has_tenant {
let tid = crate::db::tenant::resolve_tenant(tenant_id).to_string();
q = q.bind(tid);
}
let rows = q
.fetch_all(&self.pool)
.await
.map_err(|e| AppError::Internal(e.into()))?;
let data: Vec<Value> = rows
.into_iter()
.map(|(date, count)| json!({"date": date, "count": count}))
.collect();
Ok(json!({
"table": table,
"days": days,
"data": data,
}))
}
async fn count_content_types(
&self,
tenant_id: Option<&str>,
) -> Result<serde_json::Map<String, Value>, AppError> {
let tables = get_content_tables(&self.pool).await?;
let mut result = serde_json::Map::new();
for table in &tables {
let tf = crate::db::tenant::tenant_filter_ph(tenant_id, 1);
let count = count_table(&self.pool, table, &tf, tenant_id).await?;
result.insert(table.clone(), json!(count));
}
Ok(result)
}
async fn count_by_status(
&self,
table: &str,
tenant_id: Option<&str>,
) -> Result<serde_json::Map<String, Value>, AppError> {
validate_table_name(table)?;
let has_status = has_column(&self.pool, table, "status").await;
if !has_status {
return Ok(serde_json::Map::new());
}
let has_tenant = crate::db::tenant::has_tenant_id(&self.pool, table).await;
let rows = if has_tenant {
let tid = crate::db::tenant::resolve_tenant(tenant_id).to_string();
let sql = format!(
"SELECT status, COUNT(*) as cnt FROM {table} WHERE tenant_id = {} GROUP BY status",
crate::db::Driver::ph(1)
);
sqlx::query_as::<_, (String, i64)>(&sql)
.bind(&tid)
.fetch_all(&self.pool)
.await
.map_err(|e| AppError::Internal(e.into()))?
} else {
let sql = format!("SELECT status, COUNT(*) as cnt FROM {table} GROUP BY status");
sqlx::query_as::<_, (String, i64)>(&sql)
.fetch_all(&self.pool)
.await
.map_err(|e| AppError::Internal(e.into()))?
};
let mut map = serde_json::Map::new();
for (status, count) in rows {
map.insert(status, json!(count));
}
Ok(map)
}
async fn recent_activity(
&self,
tenant_id: Option<&str>,
limit: i64,
) -> Result<Vec<Value>, AppError> {
raisfast_derive::check_schema!("posts", "title", "slug", "created_at");
raisfast_derive::check_schema!("comments", "content", "created_at");
let mut activities = Vec::new();
let tf_aliased = crate::db::tenant::tenant_filter_aliased_ph("p", tenant_id, 1);
let limit_clause = format!("LIMIT {limit}");
let post_sql = format!(
"SELECT p.title, p.slug, p.created_at FROM posts p WHERE 1=1{tf_aliased} \
ORDER BY p.created_at DESC {limit_clause}"
);
let posts: Vec<(Option<String>, String, String)> = raisfast_derive::crud_query!(
&self.pool,
(Option<String>, String, String),
&post_sql,
[],
fetch_all,
tenant: tenant_id
)
.map_err(|e| AppError::Internal(e.into()))?;
for (title, slug, at) in posts {
activities.push(json!({
"type": "post.created",
"title": title.unwrap_or_default(),
"slug": slug,
"at": at,
}));
}
let comment_sql = format!(
"SELECT c.content, c.created_at FROM comments c WHERE 1=1{tf_aliased} \
ORDER BY c.created_at DESC {limit_clause}"
);
let comments: Vec<(Option<String>, String)> = raisfast_derive::crud_query!(
&self.pool,
(Option<String>, String),
&comment_sql,
[],
fetch_all,
tenant: tenant_id
)
.map_err(|e| AppError::Internal(e.into()))?;
for (content, at) in comments {
activities.push(json!({
"type": "comment.created",
"content": content.unwrap_or_default(),
"at": at,
}));
}
activities.sort_by(|a, b| {
let at_a = a["at"].as_str().unwrap_or("");
let at_b = b["at"].as_str().unwrap_or("");
at_b.cmp(at_a)
});
activities.truncate(limit as usize);
Ok(activities)
}
}
async fn count_table(
pool: &Pool,
table: &str,
tenant_filter: &str,
tenant_id: Option<&str>,
) -> Result<i64, AppError> {
validate_table_name(table)?;
let sql = format!("SELECT COUNT(*) FROM {table} WHERE 1=1{tenant_filter}");
let mut q = sqlx::query_scalar::<_, i64>(&sql);
if tenant_id.is_some() {
q = q.bind(crate::db::tenant::resolve_tenant(tenant_id));
}
q.fetch_one(pool)
.await
.map_err(|e| AppError::Internal(e.into()))
}
async fn has_column(pool: &Pool, table: &str, column: &str) -> bool {
crate::db::Driver::has_column(pool, table, column).await
}
async fn get_content_tables(pool: &Pool) -> Result<Vec<String>, AppError> {
let excluded_tables = "'users','refresh_tokens','media','plugin_storage','roles','permissions','options','tenants','pending_jobs','cron_schedules','cron_execution_log'";
Ok(crate::db::Driver::list_user_tables(pool, excluded_tables).await)
}
fn date_trunc_day_expr(col: &str) -> String {
crate::db::Driver::date_trunc_day(col)
}
fn validate_table_name(table: &str) -> Result<(), AppError> {
if crate::db::driver::is_safe_identifier(table) {
Ok(())
} else {
Err(AppError::BadRequest(format!("invalid table name: {table}")))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn stats_overview_empty_db() {
let pool = Pool::connect(":memory:").await.unwrap();
sqlx::query(
"CREATE TABLE posts (id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT, slug TEXT, created_at TEXT NOT NULL, tenant_id TEXT NOT NULL DEFAULT 'default')",
)
.execute(&pool)
.await
.unwrap();
sqlx::query(
"CREATE TABLE users (id INTEGER PRIMARY KEY AUTOINCREMENT, tenant_id TEXT NOT NULL DEFAULT 'default', username TEXT NOT NULL, role TEXT NOT NULL, status TEXT NOT NULL, registered_via TEXT NOT NULL)",
)
.execute(&pool)
.await
.unwrap();
sqlx::query(
"CREATE TABLE comments (id INTEGER PRIMARY KEY AUTOINCREMENT, content TEXT, created_at TEXT NOT NULL, tenant_id TEXT NOT NULL DEFAULT 'default')",
)
.execute(&pool)
.await
.unwrap();
sqlx::query(
"CREATE TABLE media (id INTEGER PRIMARY KEY AUTOINCREMENT, tenant_id TEXT NOT NULL DEFAULT 'default')",
)
.execute(&pool)
.await
.unwrap();
sqlx::query("CREATE TABLE categories (id INTEGER PRIMARY KEY AUTOINCREMENT, tenant_id TEXT NOT NULL DEFAULT 'default')")
.execute(&pool)
.await
.unwrap();
sqlx::query(
"CREATE TABLE tags (id INTEGER PRIMARY KEY AUTOINCREMENT, tenant_id TEXT NOT NULL DEFAULT 'default')",
)
.execute(&pool)
.await
.unwrap();
let svc = StatsService::new(pool);
let result = svc.overview(None).await.unwrap();
assert_eq!(result["total_posts"], 0);
assert_eq!(result["total_users"], 0);
assert_eq!(result["total_comments"], 0);
assert_eq!(result["total_media"], 0);
}
#[tokio::test]
async fn stats_overview_with_data() {
let pool = Pool::connect(":memory:").await.unwrap();
sqlx::query(
"CREATE TABLE posts (id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT, slug TEXT, created_at TEXT NOT NULL, tenant_id TEXT NOT NULL DEFAULT 'default')",
)
.execute(&pool)
.await
.unwrap();
sqlx::query(
"CREATE TABLE users (id INTEGER PRIMARY KEY AUTOINCREMENT, tenant_id TEXT NOT NULL DEFAULT 'default', username TEXT NOT NULL, role TEXT NOT NULL, status TEXT NOT NULL, registered_via TEXT NOT NULL)",
)
.execute(&pool)
.await
.unwrap();
sqlx::query(
"CREATE TABLE comments (id INTEGER PRIMARY KEY AUTOINCREMENT, content TEXT, created_at TEXT NOT NULL, tenant_id TEXT NOT NULL DEFAULT 'default')",
)
.execute(&pool)
.await
.unwrap();
sqlx::query(
"CREATE TABLE media (id INTEGER PRIMARY KEY AUTOINCREMENT, tenant_id TEXT NOT NULL DEFAULT 'default')",
)
.execute(&pool)
.await
.unwrap();
sqlx::query("CREATE TABLE categories (id INTEGER PRIMARY KEY AUTOINCREMENT, tenant_id TEXT NOT NULL DEFAULT 'default')")
.execute(&pool)
.await
.unwrap();
sqlx::query(
"CREATE TABLE tags (id INTEGER PRIMARY KEY AUTOINCREMENT, tenant_id TEXT NOT NULL DEFAULT 'default')",
)
.execute(&pool)
.await
.unwrap();
sqlx::query("INSERT INTO posts (id, title, slug, created_at) VALUES (1, 'Hello', 'hello', '2024-01-01T00:00:00Z')")
.execute(&pool)
.await
.unwrap();
sqlx::query("INSERT INTO users (id, username, role, status, registered_via) VALUES (1, 'user1', 'reader', 'active', 'email')")
.execute(&pool)
.await
.unwrap();
let svc = StatsService::new(pool);
let result = svc.overview(None).await.unwrap();
assert_eq!(result["total_posts"], 1);
assert_eq!(result["total_users"], 1);
assert_eq!(result["total_comments"], 0);
let activity = result["recent_activity"].as_array().unwrap();
assert!(!activity.is_empty());
assert_eq!(activity[0]["type"], "post.created");
}
#[tokio::test]
async fn stats_content_stats_with_status() {
let pool = Pool::connect(":memory:").await.unwrap();
sqlx::query(
"CREATE TABLE ct_test (id INTEGER PRIMARY KEY AUTOINCREMENT, status TEXT, tenant_id TEXT NOT NULL DEFAULT 'default')",
)
.execute(&pool)
.await
.unwrap();
sqlx::query("INSERT INTO ct_test (id, status) VALUES (1, 'draft')")
.execute(&pool)
.await
.unwrap();
sqlx::query("INSERT INTO ct_test (id, status) VALUES (2, 'published')")
.execute(&pool)
.await
.unwrap();
sqlx::query("INSERT INTO ct_test (id, status) VALUES (3, 'published')")
.execute(&pool)
.await
.unwrap();
let svc = StatsService::new(pool);
let result = svc.content_stats("ct_test", None).await.unwrap();
assert_eq!(result["total"], 3);
assert_eq!(result["by_status"]["draft"], 1);
assert_eq!(result["by_status"]["published"], 2);
}
#[tokio::test]
async fn stats_trends() {
let pool = Pool::connect(":memory:").await.unwrap();
sqlx::query(
"CREATE TABLE ct_trends (id INTEGER PRIMARY KEY AUTOINCREMENT, created_at TEXT NOT NULL, tenant_id TEXT NOT NULL DEFAULT 'default')",
)
.execute(&pool)
.await
.unwrap();
let today = chrono::Utc::now().format("%Y-%m-%d").to_string();
sqlx::query("INSERT INTO ct_trends (id, created_at) VALUES (1, ?)")
.bind(&today)
.execute(&pool)
.await
.unwrap();
let svc = StatsService::new(pool);
let result = svc.trends("ct_trends", 7, None).await.unwrap();
assert_eq!(result["days"], 7);
let data = result["data"].as_array().unwrap();
assert!(!data.is_empty());
assert_eq!(data[0]["count"], 1);
}
}