use anyhow::Result;
use chrono::Utc;
use sea_orm::entity::prelude::*;
use sea_orm::{ActiveValue::Set, IntoActiveModel, QueryOrder};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
#[sea_orm(rs_type = "String", db_type = "String(StringLen::None)")]
pub enum TaskStatus {
#[sea_orm(string_value = "pending")]
Pending,
#[sea_orm(string_value = "in_progress")]
InProgress,
#[sea_orm(string_value = "done")]
Done,
#[sea_orm(string_value = "failed")]
Failed,
#[sea_orm(string_value = "unfinished")]
Unfinished,
}
impl std::fmt::Display for TaskStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TaskStatus::Pending => write!(f, "pending"),
TaskStatus::InProgress => write!(f, "in_progress"),
TaskStatus::Done => write!(f, "done"),
TaskStatus::Failed => write!(f, "failed"),
TaskStatus::Unfinished => write!(f, "unfinished"),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)]
#[sea_orm(table_name = "tasks")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
#[sea_orm(unique)]
pub jira_key: String,
pub summary: String,
pub description: Option<String>,
pub assignee: Option<String>,
pub project: String,
pub status: TaskStatus,
pub branch_name: Option<String>,
pub pr_url: Option<String>,
pub error_message: Option<String>,
pub fetched_at: String,
pub processed_at: Option<String>,
pub repo_url: String,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}
pub fn validate_repo_url(url: &str) -> Result<()> {
if url.is_empty() {
anyhow::bail!("repo_url cannot be empty");
}
if !url.starts_with("git://") {
anyhow::bail!(
"repo_url must use the git:// protocol, got: {}",
url
);
}
Ok(())
}
impl Model {
pub async fn upsert(
db: &DatabaseConnection,
jira_key: &str,
summary: &str,
description: Option<&str>,
assignee: Option<&str>,
project: &str,
) -> Result<()> {
let now = Utc::now().to_rfc3339();
let existing = Entity::find()
.filter(Column::JiraKey.eq(jira_key))
.one(db)
.await?;
match existing {
Some(task) => {
if task.status == TaskStatus::Pending || task.status == TaskStatus::Failed {
let mut active: ActiveModel = task.into_active_model();
active.summary = Set(summary.to_string());
active.description = Set(description.map(|s| s.to_string()));
active.assignee = Set(assignee.map(|s| s.to_string()));
active.fetched_at = Set(now);
active.status = Set(TaskStatus::Pending);
active.error_message = Set(None);
active.processed_at = Set(None);
active.update(db).await?;
}
}
None => {
let active = ActiveModel {
jira_key: Set(jira_key.to_string()),
summary: Set(summary.to_string()),
description: Set(description.map(|s| s.to_string())),
assignee: Set(assignee.map(|s| s.to_string())),
project: Set(project.to_string()),
status: Set(TaskStatus::Pending),
fetched_at: Set(now),
repo_url: Set(String::new()),
..Default::default()
};
active.insert(db).await?;
}
}
Ok(())
}
pub async fn upsert_with_repo(
db: &DatabaseConnection,
jira_key: &str,
summary: &str,
description: &str,
repo_url: &str,
project: &str,
) -> Result<()> {
validate_repo_url(repo_url)?;
let now = Utc::now().to_rfc3339();
let existing = Entity::find()
.filter(Column::JiraKey.eq(jira_key))
.one(db)
.await?;
match existing {
Some(task) => {
let mut active: ActiveModel = task.into_active_model();
active.summary = Set(summary.to_string());
active.description = Set(Some(description.to_string()));
active.repo_url = Set(repo_url.to_string());
active.fetched_at = Set(now);
active.status = Set(TaskStatus::Pending);
active.error_message = Set(None);
active.processed_at = Set(None);
active.update(db).await?;
}
None => {
let active = ActiveModel {
jira_key: Set(jira_key.to_string()),
summary: Set(summary.to_string()),
description: Set(Some(description.to_string())),
project: Set(project.to_string()),
repo_url: Set(repo_url.to_string()),
status: Set(TaskStatus::Pending),
fetched_at: Set(now),
..Default::default()
};
active.insert(db).await?;
}
}
Ok(())
}
pub async fn find_pending(db: &DatabaseConnection) -> Result<Vec<Model>> {
let tasks = Entity::find()
.filter(Column::Status.eq(TaskStatus::Pending))
.order_by_asc(Column::FetchedAt)
.all(db)
.await?;
Ok(tasks)
}
pub async fn find_all(db: &DatabaseConnection) -> Result<Vec<Model>> {
let tasks = Entity::find()
.order_by_desc(Column::FetchedAt)
.all(db)
.await?;
Ok(tasks)
}
pub async fn find_by_key(db: &DatabaseConnection, jira_key: &str) -> Result<Option<Model>> {
let task = Entity::find()
.filter(Column::JiraKey.eq(jira_key))
.one(db)
.await?;
Ok(task)
}
pub async fn find_by_id(db: &DatabaseConnection, id: i32) -> Result<Option<Model>> {
let task = Entity::find_by_id(id).one(db).await?;
Ok(task)
}
pub async fn insert_new(
db: &DatabaseConnection,
jira_key: &str,
summary: &str,
description: Option<&str>,
assignee: Option<&str>,
project: &str,
) -> Result<i32> {
let now = Utc::now().to_rfc3339();
let active = ActiveModel {
jira_key: Set(jira_key.to_string()),
summary: Set(summary.to_string()),
description: Set(description.map(|s| s.to_string())),
assignee: Set(assignee.map(|s| s.to_string())),
project: Set(project.to_string()),
status: Set(TaskStatus::Pending),
fetched_at: Set(now),
repo_url: Set(String::new()),
..Default::default()
};
let result = active.insert(db).await?;
Ok(result.id)
}
pub async fn set_in_progress(db: &DatabaseConnection, jira_key: &str) -> Result<()> {
Entity::update_many()
.col_expr(Column::Status, Expr::value(TaskStatus::InProgress.to_string()))
.filter(Column::JiraKey.eq(jira_key))
.exec(db)
.await?;
Ok(())
}
pub async fn set_done(
db: &DatabaseConnection,
jira_key: &str,
branch_name: &str,
pr_url: &str,
) -> Result<()> {
let now = Utc::now().to_rfc3339();
Entity::update_many()
.col_expr(Column::Status, Expr::value(TaskStatus::Done.to_string()))
.col_expr(Column::BranchName, Expr::value(branch_name))
.col_expr(Column::PrUrl, Expr::value(pr_url))
.col_expr(Column::ProcessedAt, Expr::value(now))
.filter(Column::JiraKey.eq(jira_key))
.exec(db)
.await?;
Ok(())
}
pub async fn set_failed(db: &DatabaseConnection, jira_key: &str, error: &str) -> Result<()> {
let now = Utc::now().to_rfc3339();
Entity::update_many()
.col_expr(Column::Status, Expr::value(TaskStatus::Failed.to_string()))
.col_expr(Column::ErrorMessage, Expr::value(error))
.col_expr(Column::ProcessedAt, Expr::value(now))
.filter(Column::JiraKey.eq(jira_key))
.exec(db)
.await?;
Ok(())
}
pub async fn set_unfinished(db: &DatabaseConnection, jira_key: &str, error: &str) -> Result<()> {
let now = Utc::now().to_rfc3339();
Entity::update_many()
.col_expr(Column::Status, Expr::value(TaskStatus::Unfinished.to_string()))
.col_expr(Column::ErrorMessage, Expr::value(error))
.col_expr(Column::ProcessedAt, Expr::value(now))
.filter(Column::JiraKey.eq(jira_key))
.exec(db)
.await?;
Ok(())
}
pub async fn reset_to_pending(db: &DatabaseConnection, jira_key: &str) -> Result<()> {
Entity::update_many()
.col_expr(Column::Status, Expr::value(TaskStatus::Pending.to_string()))
.col_expr(Column::ErrorMessage, Expr::value(Option::<String>::None))
.col_expr(Column::BranchName, Expr::value(Option::<String>::None))
.col_expr(Column::PrUrl, Expr::value(Option::<String>::None))
.col_expr(Column::ProcessedAt, Expr::value(Option::<String>::None))
.filter(Column::JiraKey.eq(jira_key))
.exec(db)
.await?;
Ok(())
}
pub async fn reset_all(db: &DatabaseConnection) -> Result<u64> {
let result = Entity::delete_many().exec(db).await?;
Ok(result.rows_affected)
}
pub async fn update_task(
db: &DatabaseConnection,
id: i32,
summary: &str,
description: Option<&str>,
assignee: Option<&str>,
project: &str,
status: &str,
) -> Result<u64> {
let result = Entity::update_many()
.col_expr(Column::Summary, Expr::value(summary))
.col_expr(Column::Description, Expr::value(description))
.col_expr(Column::Assignee, Expr::value(assignee))
.col_expr(Column::Project, Expr::value(project))
.col_expr(Column::Status, Expr::value(status))
.filter(Column::Id.eq(id))
.exec(db)
.await?;
Ok(result.rows_affected)
}
pub async fn delete(db: &DatabaseConnection, id: i32) -> Result<u64> {
let result = Entity::delete_by_id(id).exec(db).await?;
Ok(result.rows_affected)
}
}