agent_sdk_store_postgres/
content.rs1use agent_sdk_core::{
2 ContentResolutionError, ContentResolutionErrorKind, ContentResolutionPolicy,
3 ContentResolveRequest, ContentResolver, ContentStore,
4 content::{ContentRef, ResolvedContent as CoreResolvedContent},
5};
6use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64};
7use serde_json::Value;
8
9use crate::{
10 PostgresStoreClient,
11 util::{content_error, decode_row},
12};
13
14#[derive(Clone)]
15pub struct PostgresContentStore {
16 client: PostgresStoreClient,
17}
18
19impl PostgresContentStore {
20 pub fn new(client: PostgresStoreClient) -> Self {
21 Self { client }
22 }
23}
24
25impl ContentResolver for PostgresContentStore {
26 fn resolve(
27 &self,
28 request: ContentResolveRequest,
29 policy: ContentResolutionPolicy,
30 ) -> Result<agent_sdk_core::ResolvedContent, ContentResolutionError> {
31 if request.requested_version != request.content_ref.version {
32 return Err(content_error(
33 ContentResolutionErrorKind::VersionMismatch,
34 request.content_ref,
35 policy.policy_refs,
36 ));
37 }
38 let response = self
39 .client
40 .execute(
41 format!("select content_json, bytes_base64 from {} where store_scope = $1 and content_id = $2", self.client.table("agent_sdk_content")),
42 vec![self.client.scope(), Value::String(request.content_ref.content_id.as_str().to_string())],
43 )
44 .map_err(|_| content_error(ContentResolutionErrorKind::StorageUnavailable, request.content_ref.clone(), policy.policy_refs.clone()))?;
45 let row = response.rows.into_iter().next().ok_or_else(|| {
46 content_error(
47 ContentResolutionErrorKind::Missing,
48 request.content_ref.clone(),
49 policy.policy_refs.clone(),
50 )
51 })?;
52 let content_ref: ContentRef = decode_row(row.clone(), "content_json").map_err(|_| {
53 content_error(
54 ContentResolutionErrorKind::StorageUnavailable,
55 request.content_ref.clone(),
56 policy.policy_refs.clone(),
57 )
58 })?;
59 if !policy.allow_raw_content {
60 return Ok(CoreResolvedContent::redacted(
61 content_ref,
62 policy.policy_refs,
63 ));
64 }
65 let bytes_base64 = row
66 .get("bytes_base64")
67 .and_then(Value::as_str)
68 .unwrap_or_default();
69 let bytes = BASE64.decode(bytes_base64).map_err(|_| {
70 content_error(
71 ContentResolutionErrorKind::StorageUnavailable,
72 request.content_ref.clone(),
73 policy.policy_refs.clone(),
74 )
75 })?;
76 if bytes.len() as u64 > policy.max_bytes {
77 return Err(content_error(
78 ContentResolutionErrorKind::MaxBytesExceeded,
79 request.content_ref,
80 policy.policy_refs,
81 ));
82 }
83 Ok(agent_sdk_core::ResolvedContent {
84 mime: content_ref.mime.clone(),
85 redacted_summary: content_ref.redacted_summary.clone(),
86 content_ref,
87 bytes: Some(bytes),
88 policy_refs: policy.policy_refs,
89 raw_content_included: true,
90 })
91 }
92
93 fn store_resolved_content(
94 &self,
95 content_ref: &ContentRef,
96 bytes: Vec<u8>,
97 ) -> Result<(), ContentResolutionError> {
98 self.put_content(content_ref, bytes)
99 }
100}
101
102impl ContentStore for PostgresContentStore {
103 fn put_content(
104 &self,
105 content_ref: &ContentRef,
106 bytes: Vec<u8>,
107 ) -> Result<(), ContentResolutionError> {
108 self.client
109 .execute(
110 format!("insert into {} (store_scope, content_id, content_json, bytes_base64) values ($1, $2, $3, $4) on conflict (store_scope, content_id) do update set content_json = excluded.content_json, bytes_base64 = excluded.bytes_base64", self.client.table("agent_sdk_content")),
111 vec![
112 self.client.scope(),
113 Value::String(content_ref.content_id.as_str().to_string()),
114 serde_json::to_value(content_ref).map_err(|_| content_error(ContentResolutionErrorKind::StorageUnavailable, content_ref.clone(), Vec::new()))?,
115 Value::String(BASE64.encode(bytes)),
116 ],
117 )
118 .map_err(|_| content_error(ContentResolutionErrorKind::StorageUnavailable, content_ref.clone(), Vec::new()))?;
119 Ok(())
120 }
121}