systemprompt_agent/services/a2a_server/processing/
persistence_service.rs1use anyhow::{Result, anyhow};
2use systemprompt_database::DbPool;
3use systemprompt_identifiers::{SessionId, TaskId, TraceId, UserId};
4use systemprompt_models::{RequestContext, TaskMetadata};
5
6use crate::models::{Message, Task, TaskState, TaskStatus};
7use crate::repository::task::{TaskRepository, UpdateTaskAndSaveMessagesParams};
8use crate::services::ArtifactPublishingService;
9
10#[derive(Debug)]
11pub struct PersistCompletedTaskServiceParams<'a> {
12 pub task: &'a Task,
13 pub user_message: &'a Message,
14 pub agent_message: &'a Message,
15 pub context: &'a RequestContext,
16 pub artifacts_already_published: bool,
17}
18
19#[derive(Debug)]
20pub struct PersistenceService {
21 db_pool: DbPool,
22}
23
24impl PersistenceService {
25 pub const fn new(db_pool: DbPool) -> Self {
26 Self { db_pool }
27 }
28
29 pub async fn create_task(
30 &self,
31 task: &Task,
32 context: &RequestContext,
33 agent_name: &str,
34 ) -> Result<()> {
35 let task_repo = TaskRepository::new(&self.db_pool)?;
36
37 task_repo
38 .create_task(crate::repository::task::RepoCreateTaskParams {
39 task,
40 user_id: &UserId::new(context.user_id().as_str()),
41 session_id: &SessionId::new(context.session_id().as_str()),
42 trace_id: &TraceId::new(context.trace_id().as_str()),
43 agent_name,
44 })
45 .await
46 .map_err(|e| anyhow!("Failed to persist task at start: {}", e))?;
47
48 tracing::info!(task_id = %task.id, "Task persisted to database");
49
50 Ok(())
51 }
52
53 pub async fn update_task_state(
54 &self,
55 task_id: &TaskId,
56 state: TaskState,
57 timestamp: &chrono::DateTime<chrono::Utc>,
58 ) -> Result<()> {
59 let task_repo = TaskRepository::new(&self.db_pool)?;
60 task_repo
61 .update_task_state(task_id, state, timestamp)
62 .await
63 .map_err(|e| anyhow!("Failed to update task state: {}", e))
64 }
65
66 pub async fn persist_completed_task(
67 &self,
68 params: PersistCompletedTaskServiceParams<'_>,
69 ) -> Result<Task> {
70 let PersistCompletedTaskServiceParams {
71 task,
72 user_message,
73 agent_message,
74 context,
75 artifacts_already_published,
76 } = params;
77 let task_repo = TaskRepository::new(&self.db_pool)?;
78
79 let updated_task = task_repo
80 .update_task_and_save_messages(UpdateTaskAndSaveMessagesParams {
81 task,
82 user_message,
83 agent_message,
84 user_id: Some(context.user_id()),
85 session_id: context.session_id(),
86 trace_id: context.trace_id(),
87 })
88 .await
89 .map_err(|e| anyhow!("Failed to update task and save messages: {}", e))?;
90
91 if !artifacts_already_published {
92 if let Some(ref artifacts) = task.artifacts {
93 let publishing_service = ArtifactPublishingService::new(&self.db_pool)?;
94 for artifact in artifacts {
95 publishing_service
96 .publish_from_a2a(artifact, &task.id, &task.context_id)
97 .await
98 .map_err(|e| {
99 anyhow!("Failed to publish artifact {}: {}", artifact.id, e)
100 })?;
101 }
102
103 tracing::info!(
104 task_id = %task.id,
105 artifact_count = artifacts.len(),
106 "Published artifacts for task"
107 );
108 }
109 }
110
111 tracing::info!(
112 task_id = %task.id,
113 context_id = %task.context_id,
114 user_id = %context.user_id(),
115 "Persisted task"
116 );
117
118 Ok(updated_task)
119 }
120
121 pub fn build_initial_task(
122 task_id: TaskId,
123 context_id: systemprompt_identifiers::ContextId,
124 agent_name: &str,
125 ) -> Task {
126 let metadata = TaskMetadata::new_agent_message(agent_name.to_string());
127
128 Task {
129 id: task_id,
130 context_id,
131 status: TaskStatus {
132 state: TaskState::Submitted,
133 message: None,
134 timestamp: Some(chrono::Utc::now()),
135 },
136 history: None,
137 artifacts: None,
138 metadata: Some(metadata),
139 kind: "task".to_string(),
140 }
141 }
142}