#[cfg(feature = "portal")]
use axum::{
body::Body,
extract::{Json, Path, State},
http::StatusCode,
response::{IntoResponse, Response},
};
#[cfg(feature = "portal")]
use chrono::{DateTime, Duration, Utc};
#[cfg(feature = "portal")]
use serde::{Deserialize, Serialize};
#[cfg(feature = "portal")]
use sqlx::PgPool;
#[cfg(feature = "portal")]
use std::path::PathBuf;
#[cfg(feature = "portal")]
use tokio_util::io::ReaderStream;
#[cfg(feature = "portal")]
use uuid::Uuid;
#[cfg(feature = "portal")]
use crate::portal::auth_db::PortalState;
#[cfg(feature = "portal")]
use crate::portal::middleware::AuthClaims;
#[cfg(feature = "portal")]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, sqlx::Type)]
#[sqlx(type_name = "text", rename_all = "lowercase")]
#[serde(rename_all = "lowercase")]
pub enum ExportStatus {
Pending,
Processing,
Completed,
Failed,
}
#[cfg(feature = "portal")]
impl std::fmt::Display for ExportStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Pending => write!(f, "pending"),
Self::Processing => write!(f, "processing"),
Self::Completed => write!(f, "completed"),
Self::Failed => write!(f, "failed"),
}
}
}
#[cfg(feature = "portal")]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, sqlx::Type)]
#[sqlx(type_name = "text", rename_all = "lowercase")]
#[serde(rename_all = "lowercase")]
pub enum ExportFormat {
Json,
Csv,
}
#[cfg(feature = "portal")]
impl std::fmt::Display for ExportFormat {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Json => write!(f, "json"),
Self::Csv => write!(f, "csv"),
}
}
}
#[cfg(feature = "portal")]
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct ExportJob {
pub id: Uuid,
pub user_id: Uuid,
pub format: String,
pub status: String,
pub include_settings: bool,
pub include_api_keys: bool,
pub include_usage_stats: bool,
pub file_path: Option<String>,
pub file_size_bytes: Option<i64>,
pub error_message: Option<String>,
pub expires_at: Option<DateTime<Utc>>,
pub created_at: DateTime<Utc>,
pub completed_at: Option<DateTime<Utc>>,
}
#[cfg(feature = "portal")]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExportOptions {
pub format: String,
#[serde(default = "default_true")]
pub include_settings: bool,
#[serde(default = "default_true")]
pub include_api_keys: bool,
#[serde(default)]
pub include_usage_stats: bool,
}
#[cfg(feature = "portal")]
fn default_true() -> bool {
true
}
#[cfg(feature = "portal")]
impl Default for ExportOptions {
fn default() -> Self {
Self {
format: "json".to_string(),
include_settings: true,
include_api_keys: true,
include_usage_stats: false,
}
}
}
#[cfg(feature = "portal")]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExportJobResponse {
pub id: Uuid,
pub status: String,
pub format: String,
pub created_at: String,
pub completed_at: Option<String>,
pub expires_at: Option<String>,
pub download_url: Option<String>,
pub file_size_bytes: Option<i64>,
pub error_message: Option<String>,
}
#[cfg(feature = "portal")]
impl ExportJobResponse {
pub fn from_job(job: ExportJob, base_url: Option<&str>) -> Self {
let download_url = if job.status == "completed" {
base_url.map(|url| format!("{}/portal/export/{}/download", url, job.id))
} else {
None
};
Self {
id: job.id,
status: job.status,
format: job.format,
created_at: job.created_at.to_rfc3339(),
completed_at: job.completed_at.map(|dt| dt.to_rfc3339()),
expires_at: job.expires_at.map(|dt| dt.to_rfc3339()),
download_url,
file_size_bytes: job.file_size_bytes,
error_message: job.error_message,
}
}
}
#[cfg(feature = "portal")]
pub struct ExportJobRepository<'a> {
pool: &'a PgPool,
}
#[cfg(feature = "portal")]
impl<'a> ExportJobRepository<'a> {
pub fn new(pool: &'a PgPool) -> Self {
Self { pool }
}
pub async fn create(
&self,
user_id: Uuid,
options: &ExportOptions,
) -> Result<ExportJob, sqlx::Error> {
let expires_at = Utc::now() + Duration::days(7);
let job = sqlx::query_as::<_, ExportJob>(
r#"
INSERT INTO export_jobs (
user_id,
format,
status,
include_settings,
include_api_keys,
include_usage_stats,
expires_at
)
VALUES ($1, $2, 'pending', $3, $4, $5, $6)
RETURNING *
"#,
)
.bind(user_id)
.bind(&options.format)
.bind(options.include_settings)
.bind(options.include_api_keys)
.bind(options.include_usage_stats)
.bind(expires_at)
.fetch_one(self.pool)
.await?;
Ok(job)
}
pub async fn get_for_user(
&self,
job_id: Uuid,
user_id: Uuid,
) -> Result<Option<ExportJob>, sqlx::Error> {
let job = sqlx::query_as::<_, ExportJob>(
r#"
SELECT * FROM export_jobs
WHERE id = $1 AND user_id = $2
"#,
)
.bind(job_id)
.bind(user_id)
.fetch_optional(self.pool)
.await?;
Ok(job)
}
pub async fn list_for_user(&self, user_id: Uuid) -> Result<Vec<ExportJob>, sqlx::Error> {
let jobs = sqlx::query_as::<_, ExportJob>(
r#"
SELECT * FROM export_jobs
WHERE user_id = $1
ORDER BY created_at DESC
LIMIT 50
"#,
)
.bind(user_id)
.fetch_all(self.pool)
.await?;
Ok(jobs)
}
pub async fn list_pending(&self) -> Result<Vec<ExportJob>, sqlx::Error> {
let jobs = sqlx::query_as::<_, ExportJob>(
r#"
SELECT * FROM export_jobs
WHERE status = 'pending'
ORDER BY created_at ASC
LIMIT 100
"#,
)
.fetch_all(self.pool)
.await?;
Ok(jobs)
}
pub async fn mark_processing(&self, job_id: Uuid) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
UPDATE export_jobs
SET status = 'processing'
WHERE id = $1
"#,
)
.bind(job_id)
.execute(self.pool)
.await?;
Ok(())
}
pub async fn mark_completed(
&self,
job_id: Uuid,
file_path: &str,
file_size_bytes: i64,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
UPDATE export_jobs
SET status = 'completed',
file_path = $2,
file_size_bytes = $3,
completed_at = NOW()
WHERE id = $1
"#,
)
.bind(job_id)
.bind(file_path)
.bind(file_size_bytes)
.execute(self.pool)
.await?;
Ok(())
}
pub async fn mark_failed(&self, job_id: Uuid, error_message: &str) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
UPDATE export_jobs
SET status = 'failed',
error_message = $2,
completed_at = NOW()
WHERE id = $1
"#,
)
.bind(job_id)
.bind(error_message)
.execute(self.pool)
.await?;
Ok(())
}
pub async fn delete_expired(&self) -> Result<Vec<String>, sqlx::Error> {
let rows = sqlx::query_as::<_, (String,)>(
r#"
DELETE FROM export_jobs
WHERE expires_at < NOW()
AND file_path IS NOT NULL
RETURNING file_path
"#,
)
.fetch_all(self.pool)
.await?;
Ok(rows.into_iter().map(|(path,)| path).collect())
}
}
#[cfg(feature = "portal")]
pub struct ExportProcessor {
pool: PgPool,
export_dir: PathBuf,
}
#[cfg(feature = "portal")]
impl ExportProcessor {
pub fn new(pool: PgPool, export_dir: PathBuf) -> Self {
Self { pool, export_dir }
}
pub async fn process_pending(&self) -> Result<usize, Box<dyn std::error::Error + Send + Sync>> {
let repo = ExportJobRepository::new(&self.pool);
let pending_jobs = repo.list_pending().await?;
let count = pending_jobs.len();
for job in pending_jobs {
if let Err(e) = self.process_job(&job).await {
tracing::error!("Failed to process export job {}: {}", job.id, e);
let _ = repo.mark_failed(job.id, &e.to_string()).await;
}
}
Ok(count)
}
async fn process_job(
&self,
job: &ExportJob,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let repo = ExportJobRepository::new(&self.pool);
repo.mark_processing(job.id).await?;
tokio::fs::create_dir_all(&self.export_dir).await?;
let data = self.gather_export_data(job).await?;
let file_name = format!("{}.{}", job.id, job.format);
let file_path = self.export_dir.join(&file_name);
let content = match job.format.as_str() {
"json" => serde_json::to_string_pretty(&data)?,
"csv" => self.to_csv(&data)?,
_ => serde_json::to_string_pretty(&data)?,
};
tokio::fs::write(&file_path, content.as_bytes()).await?;
let metadata = tokio::fs::metadata(&file_path).await?;
repo.mark_completed(
job.id,
file_path.to_string_lossy().as_ref(),
metadata.len() as i64,
)
.await?;
tracing::info!("Export job {} completed: {}", job.id, file_path.display());
Ok(())
}
async fn gather_export_data(
&self,
job: &ExportJob,
) -> Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>> {
let mut data = serde_json::json!({
"export_id": job.id,
"user_id": job.user_id,
"created_at": job.created_at.to_rfc3339(),
"format": job.format,
});
if job.include_settings {
let settings_repo = crate::portal::db::queries::SettingsRepository::new(&self.pool);
if let Ok(settings) = settings_repo.get_all(job.user_id).await {
let settings_data: Vec<serde_json::Value> = settings
.into_iter()
.map(|s| {
serde_json::json!({
"key": s.key,
"value": s.value,
"version": s.version,
})
})
.collect();
data["settings"] = serde_json::Value::Array(settings_data);
}
}
if job.include_api_keys {
let keys_repo = crate::portal::db::queries::ApiKeyRepository::new(&self.pool);
if let Ok(keys) = keys_repo.list_for_user(job.user_id).await {
let keys_data: Vec<serde_json::Value> = keys
.into_iter()
.map(|k| {
serde_json::json!({
"id": k.id,
"name": k.name,
"scopes": k.scopes,
"created_at": k.created_at.to_rfc3339(),
"expires_at": k.expires_at.map(|d| d.to_rfc3339()),
})
})
.collect();
data["api_keys"] = serde_json::Value::Array(keys_data);
}
}
if job.include_usage_stats {
data["usage_stats"] = serde_json::json!({
"note": "Usage statistics export not yet implemented"
});
}
Ok(data)
}
fn to_csv(
&self,
data: &serde_json::Value,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let mut csv = String::new();
csv.push_str("section,key,value\n");
if let Some(settings) = data.get("settings").and_then(|s| s.as_array()) {
for setting in settings {
let key = setting.get("key").and_then(|k| k.as_str()).unwrap_or("");
let value = setting
.get("value")
.map(|v| v.to_string())
.unwrap_or_default();
csv.push_str(&format!("settings,{},{}\n", key, value.replace(',', ";")));
}
}
if let Some(keys) = data.get("api_keys").and_then(|k| k.as_array()) {
for key in keys {
let name = key.get("name").and_then(|n| n.as_str()).unwrap_or("");
let id = key.get("id").and_then(|i| i.as_str()).unwrap_or("");
csv.push_str(&format!("api_keys,{},{}\n", name, id));
}
}
Ok(csv)
}
pub async fn cleanup_expired(&self) -> Result<usize, Box<dyn std::error::Error + Send + Sync>> {
let repo = ExportJobRepository::new(&self.pool);
let file_paths = repo.delete_expired().await?;
let count = file_paths.len();
for path in file_paths {
if let Err(e) = tokio::fs::remove_file(&path).await {
tracing::warn!("Failed to delete expired export file {}: {}", path, e);
}
}
Ok(count)
}
}
#[cfg(feature = "portal")]
pub async fn create_export(
State(state): State<PortalState>,
AuthClaims(claims): AuthClaims,
Json(options): Json<ExportOptions>,
) -> impl IntoResponse {
let user_id = match Uuid::parse_str(&claims.sub) {
Ok(id) => id,
Err(_) => {
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({ "error": "Invalid user ID" })),
)
.into_response();
}
};
if !["json", "csv"].contains(&options.format.as_str()) {
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({ "error": "Invalid format. Allowed: json, csv" })),
)
.into_response();
}
let repo = ExportJobRepository::new(state.db.pool());
match repo.create(user_id, &options).await {
Ok(job) => {
let response = ExportJobResponse::from_job(job, None);
(StatusCode::CREATED, Json(response)).into_response()
}
Err(e) => {
tracing::error!("Failed to create export job: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": "Failed to create export job" })),
)
.into_response()
}
}
}
#[cfg(feature = "portal")]
pub async fn get_export_status(
State(state): State<PortalState>,
AuthClaims(claims): AuthClaims,
Path(job_id): Path<Uuid>,
) -> impl IntoResponse {
let user_id = match Uuid::parse_str(&claims.sub) {
Ok(id) => id,
Err(_) => {
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({ "error": "Invalid user ID" })),
)
.into_response();
}
};
let repo = ExportJobRepository::new(state.db.pool());
match repo.get_for_user(job_id, user_id).await {
Ok(Some(job)) => {
let response = ExportJobResponse::from_job(job, None);
(StatusCode::OK, Json(response)).into_response()
}
Ok(None) => (
StatusCode::NOT_FOUND,
Json(serde_json::json!({ "error": "Export job not found" })),
)
.into_response(),
Err(e) => {
tracing::error!("Failed to get export job: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": "Failed to get export job" })),
)
.into_response()
}
}
}
#[cfg(feature = "portal")]
pub async fn download_export(
State(state): State<PortalState>,
AuthClaims(claims): AuthClaims,
Path(job_id): Path<Uuid>,
) -> impl IntoResponse {
let user_id = match Uuid::parse_str(&claims.sub) {
Ok(id) => id,
Err(_) => {
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({ "error": "Invalid user ID" })),
)
.into_response();
}
};
let repo = ExportJobRepository::new(state.db.pool());
let job = match repo.get_for_user(job_id, user_id).await {
Ok(Some(job)) => job,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(serde_json::json!({ "error": "Export job not found" })),
)
.into_response();
}
Err(e) => {
tracing::error!("Failed to get export job: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": "Failed to get export job" })),
)
.into_response();
}
};
if job.status != "completed" {
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({ "error": "Export not ready for download" })),
)
.into_response();
}
let file_path = match job.file_path {
Some(path) => path,
None => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": "Export file path not found" })),
)
.into_response();
}
};
let file = match tokio::fs::File::open(&file_path).await {
Ok(f) => f,
Err(e) => {
tracing::error!("Failed to open export file: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": "Failed to read export file" })),
)
.into_response();
}
};
let stream = ReaderStream::new(file);
let body = Body::from_stream(stream);
let content_type = match job.format.as_str() {
"json" => "application/json",
"csv" => "text/csv",
_ => "application/octet-stream",
};
let file_name = format!("reasonkit-export-{}.{}", job.id, job.format);
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", content_type)
.header(
"Content-Disposition",
format!("attachment; filename=\"{}\"", file_name),
)
.body(body)
.unwrap()
.into_response()
}
#[cfg(feature = "portal")]
pub async fn export_history(
State(state): State<PortalState>,
AuthClaims(claims): AuthClaims,
) -> impl IntoResponse {
let user_id = match Uuid::parse_str(&claims.sub) {
Ok(id) => id,
Err(_) => {
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({ "error": "Invalid user ID" })),
)
.into_response();
}
};
let repo = ExportJobRepository::new(state.db.pool());
match repo.list_for_user(user_id).await {
Ok(jobs) => {
let response: Vec<ExportJobResponse> = jobs
.into_iter()
.map(|j| ExportJobResponse::from_job(j, None))
.collect();
(StatusCode::OK, Json(response)).into_response()
}
Err(e) => {
tracing::error!("Failed to list export jobs: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": "Failed to list export jobs" })),
)
.into_response()
}
}
}
#[cfg(all(test, feature = "portal"))]
mod tests {
use super::*;
#[test]
fn test_export_status_display() {
assert_eq!(ExportStatus::Pending.to_string(), "pending");
assert_eq!(ExportStatus::Processing.to_string(), "processing");
assert_eq!(ExportStatus::Completed.to_string(), "completed");
assert_eq!(ExportStatus::Failed.to_string(), "failed");
}
#[test]
fn test_export_format_display() {
assert_eq!(ExportFormat::Json.to_string(), "json");
assert_eq!(ExportFormat::Csv.to_string(), "csv");
}
#[test]
fn test_export_options_default() {
let options = ExportOptions::default();
assert_eq!(options.format, "json");
assert!(options.include_settings);
assert!(options.include_api_keys);
assert!(!options.include_usage_stats);
}
}