Skip to main content

systemprompt_agent/services/a2a_server/processing/
persistence_service.rs

1use anyhow::{anyhow, Result};
2use systemprompt_database::DbPool;
3use systemprompt_identifiers::{SessionId, TaskId, TraceId, UserId};
4use systemprompt_models::{RequestContext, TaskMetadata};
5
6use crate::models::{Message, Task, TaskState, TaskStatus};
7use crate::repository::task::TaskRepository;
8use crate::services::ArtifactPublishingService;
9
10#[derive(Debug)]
11pub struct PersistenceService {
12    db_pool: DbPool,
13}
14
15impl PersistenceService {
16    pub const fn new(db_pool: DbPool) -> Self {
17        Self { db_pool }
18    }
19
20    pub async fn create_task(
21        &self,
22        task: &Task,
23        context: &RequestContext,
24        agent_name: &str,
25    ) -> Result<()> {
26        let task_repo = TaskRepository::new(self.db_pool.clone());
27
28        task_repo
29            .create_task(
30                task,
31                &UserId::new(context.user_id().as_str()),
32                &SessionId::new(context.session_id().as_str()),
33                &TraceId::new(context.trace_id().as_str()),
34                agent_name,
35            )
36            .await
37            .map_err(|e| anyhow!("Failed to persist task at start: {}", e))?;
38
39        tracing::info!(task_id = %task.id, "Task persisted to database");
40
41        Ok(())
42    }
43
44    pub async fn update_task_state(
45        &self,
46        task_id: &TaskId,
47        state: TaskState,
48        timestamp: &chrono::DateTime<chrono::Utc>,
49    ) -> Result<()> {
50        let task_repo = TaskRepository::new(self.db_pool.clone());
51        task_repo
52            .update_task_state(task_id, state, timestamp)
53            .await
54            .map_err(|e| anyhow!("Failed to update task state: {}", e))
55    }
56
57    pub async fn persist_completed_task(
58        &self,
59        task: &Task,
60        user_message: &Message,
61        agent_message: &Message,
62        context: &RequestContext,
63        artifacts_already_published: bool,
64    ) -> Result<Task> {
65        let task_repo = TaskRepository::new(self.db_pool.clone());
66
67        let updated_task = task_repo
68            .update_task_and_save_messages(
69                task,
70                user_message,
71                agent_message,
72                Some(context.user_id()),
73                context.session_id(),
74                context.trace_id(),
75            )
76            .await
77            .map_err(|e| anyhow!("Failed to update task and save messages: {}", e))?;
78
79        if !artifacts_already_published {
80            if let Some(ref artifacts) = task.artifacts {
81                let publishing_service = ArtifactPublishingService::new(self.db_pool.clone());
82                for artifact in artifacts {
83                    publishing_service
84                        .publish_from_a2a(artifact, &task.id, &task.context_id)
85                        .await
86                        .map_err(|e| {
87                            anyhow!("Failed to publish artifact {}: {}", artifact.id, e)
88                        })?;
89                }
90
91                tracing::info!(
92                    task_id = %task.id,
93                    artifact_count = artifacts.len(),
94                    "Published artifacts for task"
95                );
96            }
97        }
98
99        tracing::info!(
100            task_id = %task.id,
101            context_id = %task.context_id,
102            user_id = %context.user_id(),
103            "Persisted task"
104        );
105
106        Ok(updated_task)
107    }
108
109    pub fn build_initial_task(
110        &self,
111        task_id: TaskId,
112        context_id: systemprompt_identifiers::ContextId,
113        agent_name: &str,
114    ) -> Task {
115        let metadata = TaskMetadata::new_agent_message(agent_name.to_string());
116
117        Task {
118            id: task_id,
119            context_id,
120            status: TaskStatus {
121                state: TaskState::Submitted,
122                message: None,
123                timestamp: Some(chrono::Utc::now()),
124            },
125            history: None,
126            artifacts: None,
127            metadata: Some(metadata),
128            kind: "task".to_string(),
129        }
130    }
131}