use anyhow::{Result, anyhow};
use systemprompt_database::DbPool;
use systemprompt_identifiers::{SessionId, TaskId, TraceId, UserId};
use systemprompt_models::{RequestContext, TaskMetadata};
use crate::models::{Message, Task, TaskState, TaskStatus};
use crate::repository::task::{TaskRepository, UpdateTaskAndSaveMessagesParams};
use crate::services::ArtifactPublishingService;
#[derive(Debug)]
pub struct PersistCompletedTaskServiceParams<'a> {
pub task: &'a Task,
pub user_message: &'a Message,
pub agent_message: &'a Message,
pub context: &'a RequestContext,
pub artifacts_already_published: bool,
}
#[derive(Debug)]
pub struct PersistenceService {
db_pool: DbPool,
}
impl PersistenceService {
pub const fn new(db_pool: DbPool) -> Self {
Self { db_pool }
}
pub async fn create_task(
&self,
task: &Task,
context: &RequestContext,
agent_name: &str,
) -> Result<()> {
let task_repo = TaskRepository::new(&self.db_pool)?;
task_repo
.create_task(crate::repository::task::RepoCreateTaskParams {
task,
user_id: &UserId::new(context.user_id().as_str()),
session_id: &SessionId::new(context.session_id().as_str()),
trace_id: &TraceId::new(context.trace_id().as_str()),
agent_name,
})
.await
.map_err(|e| anyhow!("Failed to persist task at start: {}", e))?;
tracing::info!(task_id = %task.id, "Task persisted to database");
Ok(())
}
pub async fn update_task_state(
&self,
task_id: &TaskId,
state: TaskState,
timestamp: &chrono::DateTime<chrono::Utc>,
) -> Result<()> {
let task_repo = TaskRepository::new(&self.db_pool)?;
task_repo
.update_task_state(task_id, state, timestamp)
.await
.map_err(|e| anyhow!("Failed to update task state: {}", e))
}
pub async fn persist_completed_task(
&self,
params: PersistCompletedTaskServiceParams<'_>,
) -> Result<Task> {
let PersistCompletedTaskServiceParams {
task,
user_message,
agent_message,
context,
artifacts_already_published,
} = params;
let task_repo = TaskRepository::new(&self.db_pool)?;
let updated_task = task_repo
.update_task_and_save_messages(UpdateTaskAndSaveMessagesParams {
task,
user_message,
agent_message,
user_id: Some(context.user_id()),
session_id: context.session_id(),
trace_id: context.trace_id(),
})
.await
.map_err(|e| anyhow!("Failed to update task and save messages: {}", e))?;
if !artifacts_already_published {
if let Some(ref artifacts) = task.artifacts {
let publishing_service = ArtifactPublishingService::new(&self.db_pool)?;
for artifact in artifacts {
publishing_service
.publish_from_a2a(artifact, &task.id, &task.context_id)
.await
.map_err(|e| {
anyhow!("Failed to publish artifact {}: {}", artifact.id, e)
})?;
}
tracing::info!(
task_id = %task.id,
artifact_count = artifacts.len(),
"Published artifacts for task"
);
}
}
tracing::info!(
task_id = %task.id,
context_id = %task.context_id,
user_id = %context.user_id(),
"Persisted task"
);
Ok(updated_task)
}
pub fn build_initial_task(
task_id: TaskId,
context_id: systemprompt_identifiers::ContextId,
agent_name: &str,
) -> Task {
let metadata = TaskMetadata::new_agent_message(agent_name.to_string());
Task {
id: task_id,
context_id,
status: TaskStatus {
state: TaskState::Submitted,
message: None,
timestamp: Some(chrono::Utc::now()),
},
history: None,
artifacts: None,
metadata: Some(metadata),
created_at: Some(chrono::Utc::now()),
last_modified: Some(chrono::Utc::now()),
}
}
}