Skip to main content

agent_sdk_store_postgres/
journal.rs

1use agent_sdk_core::{
2    AgentError, JournalCursor, JournalRecord, RunId, RunJournal, RunJournalReader,
3};
4use serde_json::Value;
5
6use crate::{
7    PostgresStoreClient,
8    util::{decode_row, json_value},
9};
10
11#[derive(Clone)]
12pub struct PostgresRunJournal {
13    client: PostgresStoreClient,
14}
15
16impl PostgresRunJournal {
17    pub fn new(client: PostgresStoreClient) -> Self {
18        Self { client }
19    }
20}
21
22impl RunJournal for PostgresRunJournal {
23    fn append(&self, record: JournalRecord) -> Result<JournalCursor, AgentError> {
24        self.client.execute(
25            format!(
26                "select {}.append_journal_record($1, $2, $3, $4)",
27                self.client.config.schema
28            ),
29            vec![
30                self.client.scope(),
31                Value::String(record.run_id.as_str().to_string()),
32                Value::from(record.journal_seq),
33                json_value(&record)?,
34            ],
35        )?;
36        Ok(JournalCursor::new(format!(
37            "journal.{}",
38            record.journal_seq
39        )))
40    }
41}
42
43impl RunJournalReader for PostgresRunJournal {
44    fn records_for_run(&self, run_id: &RunId) -> Result<Vec<JournalRecord>, AgentError> {
45        let response = self.client.execute(
46            format!(
47                "select record_json from {} where store_scope = $1 and run_id = $2 order by journal_seq asc",
48                self.client.table("agent_sdk_journal_records")
49            ),
50            vec![self.client.scope(), Value::String(run_id.as_str().to_string())],
51        )?;
52        response
53            .rows
54            .into_iter()
55            .map(|row| decode_row(row, "record_json"))
56            .collect()
57    }
58}