Skip to main content

systemprompt_agent/services/a2a_server/processing/
persistence_service.rs

1use anyhow::{Result, anyhow};
2use systemprompt_database::DbPool;
3use systemprompt_identifiers::{SessionId, TaskId, TraceId, UserId};
4use systemprompt_models::{RequestContext, TaskMetadata};
5
6use crate::models::{Message, Task, TaskState, TaskStatus};
7use crate::repository::task::{TaskRepository, UpdateTaskAndSaveMessagesParams};
8use crate::services::ArtifactPublishingService;
9
10#[derive(Debug)]
11pub struct PersistCompletedTaskServiceParams<'a> {
12    pub task: &'a Task,
13    pub user_message: &'a Message,
14    pub agent_message: &'a Message,
15    pub context: &'a RequestContext,
16    pub artifacts_already_published: bool,
17}
18
19#[derive(Debug)]
20pub struct PersistenceService {
21    db_pool: DbPool,
22}
23
24impl PersistenceService {
25    pub const fn new(db_pool: DbPool) -> Self {
26        Self { db_pool }
27    }
28
29    pub async fn create_task(
30        &self,
31        task: &Task,
32        context: &RequestContext,
33        agent_name: &str,
34    ) -> Result<()> {
35        let task_repo = TaskRepository::new(&self.db_pool)?;
36
37        task_repo
38            .create_task(crate::repository::task::RepoCreateTaskParams {
39                task,
40                user_id: &UserId::new(context.user_id().as_str()),
41                session_id: &SessionId::new(context.session_id().as_str()),
42                trace_id: &TraceId::new(context.trace_id().as_str()),
43                agent_name,
44            })
45            .await
46            .map_err(|e| anyhow!("Failed to persist task at start: {}", e))?;
47
48        tracing::info!(task_id = %task.id, "Task persisted to database");
49
50        Ok(())
51    }
52
53    pub async fn update_task_state(
54        &self,
55        task_id: &TaskId,
56        state: TaskState,
57        timestamp: &chrono::DateTime<chrono::Utc>,
58    ) -> Result<()> {
59        let task_repo = TaskRepository::new(&self.db_pool)?;
60        task_repo
61            .update_task_state(task_id, state, timestamp)
62            .await
63            .map_err(|e| anyhow!("Failed to update task state: {}", e))
64    }
65
66    pub async fn persist_completed_task(
67        &self,
68        params: PersistCompletedTaskServiceParams<'_>,
69    ) -> Result<Task> {
70        let PersistCompletedTaskServiceParams {
71            task,
72            user_message,
73            agent_message,
74            context,
75            artifacts_already_published,
76        } = params;
77        let task_repo = TaskRepository::new(&self.db_pool)?;
78
79        let updated_task = task_repo
80            .update_task_and_save_messages(UpdateTaskAndSaveMessagesParams {
81                task,
82                user_message,
83                agent_message,
84                user_id: Some(context.user_id()),
85                session_id: context.session_id(),
86                trace_id: context.trace_id(),
87            })
88            .await
89            .map_err(|e| anyhow!("Failed to update task and save messages: {}", e))?;
90
91        if !artifacts_already_published {
92            if let Some(ref artifacts) = task.artifacts {
93                let publishing_service = ArtifactPublishingService::new(&self.db_pool)?;
94                for artifact in artifacts {
95                    publishing_service
96                        .publish_from_a2a(artifact, &task.id, &task.context_id)
97                        .await
98                        .map_err(|e| {
99                            anyhow!("Failed to publish artifact {}: {}", artifact.id, e)
100                        })?;
101                }
102
103                tracing::info!(
104                    task_id = %task.id,
105                    artifact_count = artifacts.len(),
106                    "Published artifacts for task"
107                );
108            }
109        }
110
111        tracing::info!(
112            task_id = %task.id,
113            context_id = %task.context_id,
114            user_id = %context.user_id(),
115            "Persisted task"
116        );
117
118        Ok(updated_task)
119    }
120
121    pub fn build_initial_task(
122        task_id: TaskId,
123        context_id: systemprompt_identifiers::ContextId,
124        agent_name: &str,
125    ) -> Task {
126        let metadata = TaskMetadata::new_agent_message(agent_name.to_string());
127
128        Task {
129            id: task_id,
130            context_id,
131            status: TaskStatus {
132                state: TaskState::Submitted,
133                message: None,
134                timestamp: Some(chrono::Utc::now()),
135            },
136            history: None,
137            artifacts: None,
138            metadata: Some(metadata),
139            kind: "task".to_string(),
140        }
141    }
142}