Skip to main content

agentzero_core/
canvas.rs

1//! Canvas store for real-time rich content delivery.
2//!
3//! Agents push content via `CanvasTool` and the gateway exposes REST + WebSocket
4//! endpoints for clients to consume it. The store is scoped per canvas ID.
5
6use std::collections::{HashMap, VecDeque};
7use std::sync::Arc;
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use tokio::sync::RwLock;
11
12/// Maximum content size per frame (256 KiB).
13pub const MAX_CONTENT_BYTES: usize = 262_144;
14
15/// Maximum history frames per canvas.
16pub const MAX_HISTORY_FRAMES: usize = 100;
17
18/// Allowed content types for canvas rendering.
19const ALLOWED_CONTENT_TYPES: &[&str] =
20    &["text/html", "image/svg+xml", "text/markdown", "text/plain"];
21
22/// A single frame of canvas content.
23#[derive(Debug, Clone, serde::Serialize)]
24pub struct CanvasFrame {
25    pub content_type: String,
26    pub content: String,
27    pub timestamp: u64,
28}
29
30/// A canvas instance with current content and history.
31#[derive(Debug, Clone, serde::Serialize)]
32pub struct Canvas {
33    pub id: String,
34    pub current: Option<CanvasFrame>,
35    pub history: VecDeque<CanvasFrame>,
36    pub created_at: u64,
37}
38
39/// Summary info for listing canvases.
40#[derive(Debug, Clone, serde::Serialize)]
41pub struct CanvasSummary {
42    pub id: String,
43    pub content_type: Option<String>,
44    pub frame_count: usize,
45    pub created_at: u64,
46}
47
48/// Thread-safe store for canvas state. Shared between the tool and gateway.
49#[derive(Debug, Clone)]
50pub struct CanvasStore {
51    canvases: Arc<RwLock<HashMap<String, Canvas>>>,
52    /// Broadcast sender for real-time updates. Each message contains (canvas_id, frame).
53    update_tx: tokio::sync::broadcast::Sender<(String, CanvasFrame)>,
54}
55
56impl CanvasStore {
57    /// Create a new empty canvas store.
58    pub fn new() -> Self {
59        let (update_tx, _) = tokio::sync::broadcast::channel(256);
60        Self {
61            canvases: Arc::new(RwLock::new(HashMap::new())),
62            update_tx,
63        }
64    }
65
66    /// List all active canvases.
67    pub async fn list(&self) -> Vec<CanvasSummary> {
68        let canvases = self.canvases.read().await;
69        let mut summaries: Vec<CanvasSummary> = canvases
70            .values()
71            .map(|c| CanvasSummary {
72                id: c.id.clone(),
73                content_type: c.current.as_ref().map(|f| f.content_type.clone()),
74                frame_count: c.history.len() + usize::from(c.current.is_some()),
75                created_at: c.created_at,
76            })
77            .collect();
78        summaries.sort_by_key(|s| s.created_at);
79        summaries
80    }
81
82    /// Get current snapshot of a canvas.
83    pub async fn snapshot(&self, id: &str) -> Option<Canvas> {
84        let canvases = self.canvases.read().await;
85        canvases.get(id).cloned()
86    }
87
88    /// Get frame history for a canvas.
89    pub async fn history(&self, id: &str) -> Option<VecDeque<CanvasFrame>> {
90        let canvases = self.canvases.read().await;
91        canvases.get(id).map(|c| c.history.clone())
92    }
93
94    /// Render content to a canvas.
95    ///
96    /// Creates the canvas if it does not exist. Pushes the previous current frame
97    /// into history (capped at [`MAX_HISTORY_FRAMES`]) and broadcasts the update.
98    ///
99    /// Returns an error if the content type is not in [`ALLOWED_CONTENT_TYPES`] or
100    /// the content exceeds [`MAX_CONTENT_BYTES`].
101    pub async fn render(&self, id: &str, content_type: &str, content: &str) -> anyhow::Result<()> {
102        if !ALLOWED_CONTENT_TYPES.contains(&content_type) {
103            anyhow::bail!(
104                "invalid content type '{}'; allowed: {:?}",
105                content_type,
106                ALLOWED_CONTENT_TYPES
107            );
108        }
109
110        if content.len() > MAX_CONTENT_BYTES {
111            anyhow::bail!(
112                "content size {} exceeds maximum {} bytes",
113                content.len(),
114                MAX_CONTENT_BYTES
115            );
116        }
117
118        let frame = CanvasFrame {
119            content_type: content_type.to_string(),
120            content: content.to_string(),
121            timestamp: now(),
122        };
123
124        let mut canvases = self.canvases.write().await;
125        let canvas = canvases.entry(id.to_string()).or_insert_with(|| Canvas {
126            id: id.to_string(),
127            current: None,
128            history: VecDeque::new(),
129            created_at: now(),
130        });
131
132        // Push old current frame into history.
133        if let Some(old) = canvas.current.take() {
134            canvas.history.push_back(old);
135            while canvas.history.len() > MAX_HISTORY_FRAMES {
136                canvas.history.pop_front();
137            }
138        }
139
140        canvas.current = Some(frame.clone());
141
142        // Broadcast; ignore error (no active receivers is fine).
143        let _ = self.update_tx.send((id.to_string(), frame));
144
145        Ok(())
146    }
147
148    /// Clear a canvas (remove current content, keep history).
149    ///
150    /// Returns `true` if the canvas existed, `false` otherwise.
151    pub async fn clear(&self, id: &str) -> bool {
152        let mut canvases = self.canvases.write().await;
153        if let Some(canvas) = canvases.get_mut(id) {
154            if let Some(old) = canvas.current.take() {
155                canvas.history.push_back(old);
156                while canvas.history.len() > MAX_HISTORY_FRAMES {
157                    canvas.history.pop_front();
158                }
159            }
160            true
161        } else {
162            false
163        }
164    }
165
166    /// Subscribe to canvas updates (for WebSocket streaming).
167    pub fn subscribe(&self) -> tokio::sync::broadcast::Receiver<(String, CanvasFrame)> {
168        self.update_tx.subscribe()
169    }
170}
171
172impl Default for CanvasStore {
173    fn default() -> Self {
174        Self::new()
175    }
176}
177
178fn now() -> u64 {
179    SystemTime::now()
180        .duration_since(UNIX_EPOCH)
181        .unwrap_or_default()
182        .as_secs()
183}
184
185#[cfg(test)]
186mod tests {
187    use super::*;
188
189    #[tokio::test]
190    async fn render_creates_canvas() {
191        let store = CanvasStore::new();
192        store
193            .render("c1", "text/html", "<h1>hi</h1>")
194            .await
195            .expect("render should succeed");
196
197        let snap = store.snapshot("c1").await.expect("canvas should exist");
198        assert_eq!(snap.id, "c1");
199        assert!(snap.current.is_some());
200        assert_eq!(
201            snap.current.as_ref().expect("current frame exists").content,
202            "<h1>hi</h1>"
203        );
204    }
205
206    #[tokio::test]
207    async fn render_updates_existing() {
208        let store = CanvasStore::new();
209        store
210            .render("c1", "text/plain", "first")
211            .await
212            .expect("first render");
213        store
214            .render("c1", "text/plain", "second")
215            .await
216            .expect("second render");
217
218        let snap = store.snapshot("c1").await.expect("canvas should exist");
219        assert_eq!(
220            snap.current.as_ref().expect("current frame exists").content,
221            "second"
222        );
223        assert_eq!(snap.history.len(), 1);
224        assert_eq!(snap.history[0].content, "first");
225    }
226
227    #[tokio::test]
228    async fn snapshot_returns_current() {
229        let store = CanvasStore::new();
230        store
231            .render("s1", "text/markdown", "# Title")
232            .await
233            .expect("render");
234
235        let snap = store.snapshot("s1").await.expect("canvas exists");
236        let frame = snap.current.expect("has current");
237        assert_eq!(frame.content_type, "text/markdown");
238        assert_eq!(frame.content, "# Title");
239    }
240
241    #[tokio::test]
242    async fn list_returns_summaries() {
243        let store = CanvasStore::new();
244        store
245            .render("a", "text/plain", "hello")
246            .await
247            .expect("render a");
248        store
249            .render("b", "text/html", "<p>world</p>")
250            .await
251            .expect("render b");
252
253        let list = store.list().await;
254        assert_eq!(list.len(), 2);
255
256        let ids: Vec<&str> = list.iter().map(|s| s.id.as_str()).collect();
257        assert!(ids.contains(&"a"));
258        assert!(ids.contains(&"b"));
259    }
260
261    #[tokio::test]
262    async fn history_truncates_at_max() {
263        let store = CanvasStore::new();
264        let total = MAX_HISTORY_FRAMES + 5;
265        for i in 0..=total {
266            store
267                .render("h", "text/plain", &format!("frame-{i}"))
268                .await
269                .expect("render");
270        }
271
272        let hist = store.history("h").await.expect("canvas exists");
273        assert_eq!(hist.len(), MAX_HISTORY_FRAMES);
274    }
275
276    #[tokio::test]
277    async fn clear_removes_current() {
278        let store = CanvasStore::new();
279        store
280            .render("cl", "text/plain", "data")
281            .await
282            .expect("render");
283
284        let cleared = store.clear("cl").await;
285        assert!(cleared);
286
287        let snap = store.snapshot("cl").await.expect("canvas still exists");
288        assert!(snap.current.is_none());
289        assert_eq!(snap.history.len(), 1);
290    }
291
292    #[tokio::test]
293    async fn clear_nonexistent_returns_false() {
294        let store = CanvasStore::new();
295        assert!(!store.clear("nope").await);
296    }
297
298    #[tokio::test]
299    async fn invalid_content_type_rejected() {
300        let store = CanvasStore::new();
301        let err = store
302            .render("bad", "application/octet-stream", "bytes")
303            .await;
304        assert!(err.is_err());
305        let msg = format!("{}", err.expect_err("should be error"));
306        assert!(msg.contains("invalid content type"));
307    }
308
309    #[tokio::test]
310    async fn oversized_content_rejected() {
311        let store = CanvasStore::new();
312        let big = "x".repeat(MAX_CONTENT_BYTES + 1);
313        let err = store.render("big", "text/plain", &big).await;
314        assert!(err.is_err());
315        let msg = format!("{}", err.expect_err("should be error"));
316        assert!(msg.contains("exceeds maximum"));
317    }
318
319    #[tokio::test]
320    async fn subscribe_receives_updates() {
321        let store = CanvasStore::new();
322        let mut rx = store.subscribe();
323
324        store
325            .render("sub", "text/plain", "hello")
326            .await
327            .expect("render");
328
329        let (id, frame) = rx.recv().await.expect("should receive update");
330        assert_eq!(id, "sub");
331        assert_eq!(frame.content, "hello");
332    }
333}