Skip to main content

agent_sdk_store_postgres/
content.rs

1use agent_sdk_core::{
2    ContentResolutionError, ContentResolutionErrorKind, ContentResolutionPolicy,
3    ContentResolveRequest, ContentResolver, ContentStore,
4    content::{ContentRef, ResolvedContent as CoreResolvedContent},
5};
6use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64};
7use serde_json::Value;
8
9use crate::{
10    PostgresStoreClient,
11    util::{content_error, decode_row},
12};
13
14#[derive(Clone)]
15pub struct PostgresContentStore {
16    client: PostgresStoreClient,
17}
18
19impl PostgresContentStore {
20    pub fn new(client: PostgresStoreClient) -> Self {
21        Self { client }
22    }
23}
24
25impl ContentResolver for PostgresContentStore {
26    fn resolve(
27        &self,
28        request: ContentResolveRequest,
29        policy: ContentResolutionPolicy,
30    ) -> Result<agent_sdk_core::ResolvedContent, ContentResolutionError> {
31        if request.requested_version != request.content_ref.version {
32            return Err(content_error(
33                ContentResolutionErrorKind::VersionMismatch,
34                request.content_ref,
35                policy.policy_refs,
36            ));
37        }
38        let response = self
39            .client
40            .execute(
41                format!("select content_json, bytes_base64 from {} where store_scope = $1 and content_id = $2", self.client.table("agent_sdk_content")),
42                vec![self.client.scope(), Value::String(request.content_ref.content_id.as_str().to_string())],
43            )
44            .map_err(|_| content_error(ContentResolutionErrorKind::StorageUnavailable, request.content_ref.clone(), policy.policy_refs.clone()))?;
45        let row = response.rows.into_iter().next().ok_or_else(|| {
46            content_error(
47                ContentResolutionErrorKind::Missing,
48                request.content_ref.clone(),
49                policy.policy_refs.clone(),
50            )
51        })?;
52        let content_ref: ContentRef = decode_row(row.clone(), "content_json").map_err(|_| {
53            content_error(
54                ContentResolutionErrorKind::StorageUnavailable,
55                request.content_ref.clone(),
56                policy.policy_refs.clone(),
57            )
58        })?;
59        if !policy.allow_raw_content {
60            return Ok(CoreResolvedContent::redacted(
61                content_ref,
62                policy.policy_refs,
63            ));
64        }
65        let bytes_base64 = row
66            .get("bytes_base64")
67            .and_then(Value::as_str)
68            .unwrap_or_default();
69        let bytes = BASE64.decode(bytes_base64).map_err(|_| {
70            content_error(
71                ContentResolutionErrorKind::StorageUnavailable,
72                request.content_ref.clone(),
73                policy.policy_refs.clone(),
74            )
75        })?;
76        if bytes.len() as u64 > policy.max_bytes {
77            return Err(content_error(
78                ContentResolutionErrorKind::MaxBytesExceeded,
79                request.content_ref,
80                policy.policy_refs,
81            ));
82        }
83        Ok(agent_sdk_core::ResolvedContent {
84            mime: content_ref.mime.clone(),
85            redacted_summary: content_ref.redacted_summary.clone(),
86            content_ref,
87            bytes: Some(bytes),
88            policy_refs: policy.policy_refs,
89            raw_content_included: true,
90        })
91    }
92
93    fn store_resolved_content(
94        &self,
95        content_ref: &ContentRef,
96        bytes: Vec<u8>,
97    ) -> Result<(), ContentResolutionError> {
98        self.put_content(content_ref, bytes)
99    }
100}
101
102impl ContentStore for PostgresContentStore {
103    fn put_content(
104        &self,
105        content_ref: &ContentRef,
106        bytes: Vec<u8>,
107    ) -> Result<(), ContentResolutionError> {
108        self.client
109            .execute(
110                format!("insert into {} (store_scope, content_id, content_json, bytes_base64) values ($1, $2, $3, $4) on conflict (store_scope, content_id) do update set content_json = excluded.content_json, bytes_base64 = excluded.bytes_base64", self.client.table("agent_sdk_content")),
111                vec![
112                    self.client.scope(),
113                    Value::String(content_ref.content_id.as_str().to_string()),
114                    serde_json::to_value(content_ref).map_err(|_| content_error(ContentResolutionErrorKind::StorageUnavailable, content_ref.clone(), Vec::new()))?,
115                    Value::String(BASE64.encode(bytes)),
116                ],
117            )
118            .map_err(|_| content_error(ContentResolutionErrorKind::StorageUnavailable, content_ref.clone(), Vec::new()))?;
119        Ok(())
120    }
121}