arcp_runtime/runtime/
artifact.rs1use std::sync::Arc;
7use std::time::{Duration, SystemTime};
8
9use base64::engine::general_purpose::STANDARD as B64;
10use base64::Engine as _;
11use dashmap::DashMap;
12
13use arcp_core::error::ARCPError;
14use arcp_core::ids::ArtifactId;
15use arcp_core::messages::ArtifactRef;
16
17#[derive(Clone, Default)]
19pub struct ArtifactStore {
20 inner: Arc<DashMap<ArtifactId, StoredArtifact>>,
21 default_retention: Duration,
22}
23
24impl std::fmt::Debug for ArtifactStore {
25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26 f.debug_struct("ArtifactStore")
27 .field("len", &self.inner.len())
28 .field("default_retention_secs", &self.default_retention.as_secs())
29 .finish()
30 }
31}
32
33#[derive(Debug, Clone)]
34struct StoredArtifact {
35 media_type: String,
36 bytes: Vec<u8>,
37 #[allow(dead_code)]
40 sha256: Option<String>,
41 expires_at: Option<SystemTime>,
42}
43
44const MAX_INLINE_BYTES: usize = 4 * 1024 * 1024;
46
47impl ArtifactStore {
48 #[must_use]
50 pub fn new() -> Self {
51 Self {
52 inner: Arc::new(DashMap::new()),
53 default_retention: Duration::from_secs(3600),
54 }
55 }
56
57 #[must_use]
59 pub const fn with_default_retention(mut self, duration: Duration) -> Self {
60 self.default_retention = duration;
61 self
62 }
63
64 pub fn put(
72 &self,
73 media_type: impl Into<String>,
74 data: &str,
75 retain_seconds: Option<u64>,
76 sha256: Option<String>,
77 ) -> Result<ArtifactRef, ARCPError> {
78 let bytes = B64.decode(data).map_err(|e| ARCPError::InvalidArgument {
79 detail: format!("invalid base64 in artifact.put: {e}"),
80 })?;
81 if bytes.len() > MAX_INLINE_BYTES {
82 return Err(ARCPError::InvalidArgument {
83 detail: format!(
84 "artifact body exceeds {MAX_INLINE_BYTES} bytes (got {})",
85 bytes.len()
86 ),
87 });
88 }
89 let id = ArtifactId::new();
90 let media_type = media_type.into();
91 let expires_at = retain_seconds
92 .map(Duration::from_secs)
93 .or(Some(self.default_retention))
94 .map(|d| SystemTime::now() + d);
95 let size = u64::try_from(bytes.len()).unwrap_or(u64::MAX);
96 let stored = StoredArtifact {
97 media_type: media_type.clone(),
98 bytes,
99 sha256: sha256.clone(),
100 expires_at,
101 };
102 self.inner.insert(id.clone(), stored);
103 Ok(ArtifactRef {
104 artifact_id: id.clone(),
105 uri: format!("arcp://artifact/{id}"),
106 media_type,
107 size,
108 sha256,
109 expires_at: expires_at.map(chrono::DateTime::<chrono::Utc>::from),
110 })
111 }
112
113 pub fn fetch(&self, id: &ArtifactId) -> Result<(String, String), ARCPError> {
121 if let Some(entry) = self.inner.get(id) {
122 if entry.expires_at.is_some_and(|t| SystemTime::now() > t) {
123 drop(entry);
124 self.inner.remove(id);
125 return Err(ARCPError::NotFound {
126 kind: "artifact",
127 id: id.to_string(),
128 });
129 }
130 let body = B64.encode(&entry.bytes);
131 Ok((body, entry.media_type.clone()))
132 } else {
133 Err(ARCPError::NotFound {
134 kind: "artifact",
135 id: id.to_string(),
136 })
137 }
138 }
139
140 pub fn release(&self, id: &ArtifactId) {
142 self.inner.remove(id);
143 }
144
145 #[must_use]
147 pub fn len(&self) -> usize {
148 self.inner.len()
149 }
150
151 #[must_use]
153 pub fn is_empty(&self) -> bool {
154 self.inner.is_empty()
155 }
156
157 #[must_use]
159 pub fn sweep_expired(&self) -> usize {
160 let now = SystemTime::now();
161 let expired: Vec<ArtifactId> = self
162 .inner
163 .iter()
164 .filter_map(|r| {
165 if r.value().expires_at.is_some_and(|t| now > t) {
166 Some(r.key().clone())
167 } else {
168 None
169 }
170 })
171 .collect();
172 let n = expired.len();
173 for id in expired {
174 self.inner.remove(&id);
175 }
176 n
177 }
178}
179
180#[cfg(test)]
181#[allow(
182 clippy::expect_used,
183 clippy::unwrap_used,
184 clippy::panic,
185 clippy::missing_panics_doc
186)]
187mod tests {
188 use super::*;
189
190 #[test]
191 fn put_then_fetch_round_trips_bytes() {
192 let store = ArtifactStore::new();
193 let body = B64.encode(b"hello world");
194 let r = store.put("text/plain", &body, Some(60), None).expect("put");
195 assert!(r.uri.starts_with("arcp://artifact/art_"));
196 assert_eq!(r.size, b"hello world".len() as u64);
197
198 let (back, media) = store.fetch(&r.artifact_id).expect("fetch");
199 assert_eq!(back, body);
200 assert_eq!(media, "text/plain");
201 }
202
203 #[test]
204 fn fetch_missing_returns_not_found() {
205 let store = ArtifactStore::new();
206 let id = ArtifactId::new();
207 let err = store.fetch(&id).expect_err("missing");
208 assert!(matches!(
209 err,
210 ARCPError::NotFound {
211 kind: "artifact",
212 ..
213 }
214 ));
215 }
216
217 #[test]
218 fn release_removes_artifact() {
219 let store = ArtifactStore::new();
220 let r = store
221 .put("application/json", &B64.encode(b"{}"), None, None)
222 .expect("put");
223 store.release(&r.artifact_id);
224 assert!(store.fetch(&r.artifact_id).is_err());
225 }
226
227 #[test]
228 fn invalid_base64_rejected() {
229 let store = ArtifactStore::new();
230 let err = store
231 .put("text/plain", "!!!not-base64", None, None)
232 .expect_err("must reject");
233 assert!(matches!(err, ARCPError::InvalidArgument { .. }));
234 }
235
236 #[test]
237 fn oversize_payload_rejected() {
238 let store = ArtifactStore::new();
239 let big = vec![0u8; MAX_INLINE_BYTES + 1];
240 let err = store
241 .put("application/octet-stream", &B64.encode(&big), None, None)
242 .expect_err("must reject");
243 assert!(matches!(err, ARCPError::InvalidArgument { .. }));
244 }
245}