systemprompt_agent/services/a2a_server/processing/
persistence_service.rs1use anyhow::{anyhow, Result};
2use systemprompt_database::DbPool;
3use systemprompt_identifiers::{SessionId, TaskId, TraceId, UserId};
4use systemprompt_models::{RequestContext, TaskMetadata};
5
6use crate::models::{Message, Task, TaskState, TaskStatus};
7use crate::repository::task::TaskRepository;
8use crate::services::ArtifactPublishingService;
9
10#[derive(Debug)]
11pub struct PersistenceService {
12 db_pool: DbPool,
13}
14
15impl PersistenceService {
16 pub const fn new(db_pool: DbPool) -> Self {
17 Self { db_pool }
18 }
19
20 pub async fn create_task(
21 &self,
22 task: &Task,
23 context: &RequestContext,
24 agent_name: &str,
25 ) -> Result<()> {
26 let task_repo = TaskRepository::new(self.db_pool.clone());
27
28 task_repo
29 .create_task(
30 task,
31 &UserId::new(context.user_id().as_str()),
32 &SessionId::new(context.session_id().as_str()),
33 &TraceId::new(context.trace_id().as_str()),
34 agent_name,
35 )
36 .await
37 .map_err(|e| anyhow!("Failed to persist task at start: {}", e))?;
38
39 tracing::info!(task_id = %task.id, "Task persisted to database");
40
41 Ok(())
42 }
43
44 pub async fn update_task_state(
45 &self,
46 task_id: &TaskId,
47 state: TaskState,
48 timestamp: &chrono::DateTime<chrono::Utc>,
49 ) -> Result<()> {
50 let task_repo = TaskRepository::new(self.db_pool.clone());
51 task_repo
52 .update_task_state(task_id, state, timestamp)
53 .await
54 .map_err(|e| anyhow!("Failed to update task state: {}", e))
55 }
56
57 pub async fn persist_completed_task(
58 &self,
59 task: &Task,
60 user_message: &Message,
61 agent_message: &Message,
62 context: &RequestContext,
63 artifacts_already_published: bool,
64 ) -> Result<Task> {
65 let task_repo = TaskRepository::new(self.db_pool.clone());
66
67 let updated_task = task_repo
68 .update_task_and_save_messages(
69 task,
70 user_message,
71 agent_message,
72 Some(context.user_id()),
73 context.session_id(),
74 context.trace_id(),
75 )
76 .await
77 .map_err(|e| anyhow!("Failed to update task and save messages: {}", e))?;
78
79 if !artifacts_already_published {
80 if let Some(ref artifacts) = task.artifacts {
81 let publishing_service = ArtifactPublishingService::new(self.db_pool.clone());
82 for artifact in artifacts {
83 publishing_service
84 .publish_from_a2a(artifact, &task.id, &task.context_id)
85 .await
86 .map_err(|e| {
87 anyhow!("Failed to publish artifact {}: {}", artifact.id, e)
88 })?;
89 }
90
91 tracing::info!(
92 task_id = %task.id,
93 artifact_count = artifacts.len(),
94 "Published artifacts for task"
95 );
96 }
97 }
98
99 tracing::info!(
100 task_id = %task.id,
101 context_id = %task.context_id,
102 user_id = %context.user_id(),
103 "Persisted task"
104 );
105
106 Ok(updated_task)
107 }
108
109 pub fn build_initial_task(
110 &self,
111 task_id: TaskId,
112 context_id: systemprompt_identifiers::ContextId,
113 agent_name: &str,
114 ) -> Task {
115 let metadata = TaskMetadata::new_agent_message(agent_name.to_string());
116
117 Task {
118 id: task_id,
119 context_id,
120 status: TaskStatus {
121 state: TaskState::Submitted,
122 message: None,
123 timestamp: Some(chrono::Utc::now()),
124 },
125 history: None,
126 artifacts: None,
127 metadata: Some(metadata),
128 kind: "task".to_string(),
129 }
130 }
131}