drawing_demo/
drawing_demo.rs1use std::{collections::HashMap, net::SocketAddr, sync::Arc};
2
3use async_trait::async_trait;
4use axum::{Json, Router, response::Html, routing::get};
5use realtime::server::{
6 RealtimeError, RealtimeTokenVerifier, SessionAuth, SocketAppState, SocketServerHandle,
7};
8use serde::Serialize;
9use serde_json::{Value, json};
10
11const DEFAULT_ADDR: &str = "127.0.0.1:4002";
12const STROKE_EVENT: &str = "stroke.chunk";
13const SYNC_REQUEST_EVENT: &str = "board.sync.request";
14const SYNC_SNAPSHOT_EVENT: &str = "board.sync.snapshot";
15const BOARD_CLEARED_EVENT: &str = "board.cleared";
16const MAX_CHUNKS_PER_BOARD: usize = 12_000;
17
18#[derive(Clone)]
19struct DemoUser {
20 user_id: String,
21 label: String,
22 token: String,
23 roles: Vec<String>,
24}
25
26#[derive(Serialize)]
27struct DemoUserView {
28 user_id: String,
29 label: String,
30 token: String,
31 roles: Vec<String>,
32}
33
34#[derive(Clone)]
35struct StaticTokenVerifier {
36 sessions: Arc<HashMap<String, SessionAuth>>,
37}
38
39impl StaticTokenVerifier {
40 fn new(users: &[DemoUser]) -> Self {
41 let sessions = users
42 .iter()
43 .map(|user| {
44 (
45 user.token.clone(),
46 SessionAuth {
47 user_id: user.user_id.clone(),
48 roles: user.roles.clone(),
49 },
50 )
51 })
52 .collect();
53 Self {
54 sessions: Arc::new(sessions),
55 }
56 }
57}
58
59#[async_trait]
60impl RealtimeTokenVerifier for StaticTokenVerifier {
61 async fn verify_token(&self, token: &str) -> Result<SessionAuth, RealtimeError> {
62 let token = token.trim();
63 if token.is_empty() {
64 return Err(RealtimeError::unauthorized("Missing token"));
65 }
66 self.sessions
67 .get(token)
68 .cloned()
69 .ok_or_else(|| RealtimeError::unauthorized("Invalid demo token"))
70 }
71}
72
73#[derive(Clone, Default)]
74struct BoardStore {
75 boards: Arc<std::sync::Mutex<HashMap<String, Vec<Value>>>>,
76}
77
78impl BoardStore {
79 fn append_chunk(&self, board_channel: &str, payload: Value) {
80 let mut boards = self.boards.lock().expect("board store mutex poisoned");
81 let entries = boards.entry(board_channel.to_string()).or_default();
82 entries.push(payload);
83 if entries.len() > MAX_CHUNKS_PER_BOARD {
84 let drain_len = entries.len().saturating_sub(MAX_CHUNKS_PER_BOARD);
85 entries.drain(0..drain_len);
86 }
87 }
88
89 fn snapshot(&self, board_channel: &str) -> Vec<Value> {
90 self.boards
91 .lock()
92 .expect("board store mutex poisoned")
93 .get(board_channel)
94 .cloned()
95 .unwrap_or_default()
96 }
97
98 fn clear_board(&self, board_channel: &str) {
99 self.boards
100 .lock()
101 .expect("board store mutex poisoned")
102 .remove(board_channel);
103 }
104}
105
106#[tokio::main]
107async fn main() -> Result<(), Box<dyn std::error::Error>> {
108 let users = demo_users();
109 let verifier = StaticTokenVerifier::new(&users);
110 let socket_server_handle = SocketServerHandle::spawn(Default::default());
111 let board_store = BoardStore::default();
112
113 {
114 let server = socket_server_handle.clone();
115 let board_store = board_store.clone();
116 socket_server_handle.on_events(move |channel, event, payload| {
117 if !channel.starts_with("board:") {
118 return;
119 }
120
121 match event.as_str() {
122 STROKE_EVENT => {
123 board_store.append_chunk(&channel, payload);
124 }
125 BOARD_CLEARED_EVENT => {
126 board_store.clear_board(&channel);
127 }
128 SYNC_REQUEST_EVENT => {
129 let Some(requester_user_id) = payload
130 .get("requester_user_id")
131 .and_then(Value::as_str)
132 .map(str::trim)
133 .filter(|value| !value.is_empty())
134 .map(|value| value.to_string())
135 else {
136 return;
137 };
138
139 let chunks = board_store.snapshot(&channel);
140 let chunk_count = chunks.len();
141 let snapshot_payload = json!({
142 "board_channel": channel,
143 "snapshot_version": 1,
144 "chunk_count": chunk_count,
145 "chunks": chunks,
146 });
147
148 let server = server.clone();
149 tokio::spawn(async move {
150 if let Err(err) = server
151 .send_event_to_user(
152 requester_user_id,
153 SYNC_SNAPSHOT_EVENT,
154 snapshot_payload,
155 )
156 .await
157 {
158 eprintln!("failed to send board snapshot: {err}");
159 }
160 });
161 }
162 _ => {}
163 }
164 });
165 }
166
167 let socket_app_state = Arc::new(SocketAppState::new(socket_server_handle, verifier));
168
169 let app = Router::new()
170 .route("/", get(index))
171 .route("/demo/users", get(demo_users_handler))
172 .nest("/api/v1", realtime::server::axum::router(socket_app_state));
173
174 let addr = demo_addr();
175 println!("realtime drawing demo listening on http://{addr}");
176 println!("open http://{addr} in your browser");
177
178 let listener = tokio::net::TcpListener::bind(addr).await?;
179 axum::serve(listener, app).await?;
180 Ok(())
181}
182
183fn demo_users() -> Vec<DemoUser> {
184 vec![
185 DemoUser {
186 user_id: "u-alice".to_string(),
187 label: "Alice".to_string(),
188 token: "demo-token-alice".to_string(),
189 roles: vec!["user".to_string()],
190 },
191 DemoUser {
192 user_id: "u-bob".to_string(),
193 label: "Bob".to_string(),
194 token: "demo-token-bob".to_string(),
195 roles: vec!["user".to_string()],
196 },
197 DemoUser {
198 user_id: "u-admin".to_string(),
199 label: "Admin".to_string(),
200 token: "demo-token-admin".to_string(),
201 roles: vec!["admin".to_string(), "user".to_string()],
202 },
203 ]
204}
205
206fn demo_addr() -> SocketAddr {
207 let raw =
208 std::env::var("REALTIME_DRAWING_DEMO_ADDR").unwrap_or_else(|_| DEFAULT_ADDR.to_string());
209 raw.parse()
210 .unwrap_or_else(|_| panic!("invalid REALTIME_DRAWING_DEMO_ADDR: {raw}"))
211}
212
213async fn index() -> Html<&'static str> {
214 Html(INDEX_HTML)
215}
216
217async fn demo_users_handler() -> Json<Vec<DemoUserView>> {
218 let users = demo_users()
219 .iter()
220 .map(|user| DemoUserView {
221 user_id: user.user_id.clone(),
222 label: user.label.clone(),
223 token: user.token.clone(),
224 roles: user.roles.clone(),
225 })
226 .collect();
227 Json(users)
228}
229
230const INDEX_HTML: &str = include_str!("views/drawing_demo.html");