1use std::sync::Mutex;
8
9use crate::protocol::{
10 ConsoleEntry, ErrorEntry, Filters, LogEntry, NetworkEntry, QueryResponse, SessionInfo,
11 generate_id, now_millis,
12};
13
14pub const CONSOLE_CAPACITY: usize = 500;
16pub const ERROR_CAPACITY: usize = 100;
18pub const NETWORK_CAPACITY: usize = 300;
20pub const APP_CAPACITY: usize = 50;
22
23pub struct RingBuffer<T> {
26 buf: Vec<Option<T>>,
27 head: usize,
28 size: usize,
29 capacity: usize,
30}
31
32impl<T: Clone> RingBuffer<T> {
33 pub fn new(capacity: usize) -> Self {
35 let mut buf = Vec::with_capacity(capacity);
36 buf.resize_with(capacity, || None);
37 Self {
38 buf,
39 head: 0,
40 size: 0,
41 capacity,
42 }
43 }
44
45 pub fn push(&mut self, item: T) {
47 if self.size == self.capacity {
48 self.buf[self.head] = Some(item);
49 self.head = (self.head + 1) % self.capacity;
50 } else {
51 let idx = (self.head + self.size) % self.capacity;
52 self.buf[idx] = Some(item);
53 self.size += 1;
54 }
55 }
56
57 pub fn to_vec(&self) -> Vec<T> {
59 let mut result = Vec::with_capacity(self.size);
60 for i in 0..self.size {
61 if let Some(item) = &self.buf[(self.head + i) % self.capacity] {
62 result.push(item.clone());
63 }
64 }
65 result
66 }
67
68 pub fn filter<F: Fn(&T) -> bool>(&self, pred: F) -> Vec<T> {
70 let mut result = Vec::new();
71 for i in 0..self.size {
72 if let Some(item) = &self.buf[(self.head + i) % self.capacity]
73 && pred(item)
74 {
75 result.push(item.clone());
76 }
77 }
78 result
79 }
80
81 pub fn clear(&mut self) {
83 self.head = 0;
84 self.size = 0;
85 for slot in &mut self.buf {
86 *slot = None;
87 }
88 }
89
90 pub fn len(&self) -> usize {
92 self.size
93 }
94
95 pub fn is_empty(&self) -> bool {
97 self.size == 0
98 }
99}
100
101pub struct LogStore {
106 console: Mutex<RingBuffer<serde_json::Value>>,
107 errors: Mutex<RingBuffer<serde_json::Value>>,
108 network: Mutex<RingBuffer<serde_json::Value>>,
109 app: Mutex<RingBuffer<serde_json::Value>>,
110 session: SessionInfo,
111}
112
113impl LogStore {
114 pub fn new(session: SessionInfo) -> Self {
116 Self {
117 console: Mutex::new(RingBuffer::new(CONSOLE_CAPACITY)),
118 errors: Mutex::new(RingBuffer::new(ERROR_CAPACITY)),
119 network: Mutex::new(RingBuffer::new(NETWORK_CAPACITY)),
120 app: Mutex::new(RingBuffer::new(APP_CAPACITY)),
121 session,
122 }
123 }
124
125 pub fn push(&self, mut entry: LogEntry) {
129 if entry.id().is_none() {
130 entry.set_id(generate_id());
131 }
132
133 let value = match serde_json::to_value(&entry) {
136 Ok(v) => v,
137 Err(_) => return,
138 };
139
140 match entry {
141 LogEntry::Console(_) => {
142 if let Ok(mut buf) = self.console.lock() {
143 buf.push(value);
144 }
145 }
146 LogEntry::Error(_) => {
147 if let Ok(mut buf) = self.errors.lock() {
148 buf.push(value);
149 }
150 }
151 LogEntry::Network(_) => {
152 if let Ok(mut buf) = self.network.lock() {
153 buf.push(value);
154 }
155 }
156 LogEntry::App(_) => {
157 if let Ok(mut buf) = self.app.lock() {
158 buf.push(value);
159 }
160 }
161 }
162 }
163
164 pub fn push_console(&self, level: &str, args: Vec<serde_json::Value>, source: &str) {
166 self.push(LogEntry::Console(ConsoleEntry {
167 id: None,
168 level: level.to_string(),
169 args,
170 timestamp: now_millis(),
171 source: source.to_string(),
172 }));
173 }
174
175 pub fn push_error(&self, message: &str, stack: Option<String>, source: &str) {
177 self.push(LogEntry::Error(ErrorEntry {
178 id: None,
179 message: message.to_string(),
180 stack,
181 timestamp: now_millis(),
182 source: source.to_string(),
183 url: None,
184 component: None,
185 }));
186 }
187
188 pub fn push_network(&self, entry: NetworkEntry) {
190 self.push(LogEntry::Network(entry));
191 }
192
193 pub fn session(&self) -> &SessionInfo {
195 &self.session
196 }
197
198 pub fn query(&self, command: &str, filters: &Filters) -> QueryResponse {
200 let data = match command {
201 "console" => self.query_console(filters),
202 "errors" => self.query_errors(filters),
203 "network" => self.query_network(filters),
204 "app" => self.query_app(),
205 "all" => self.query_all(filters),
206 "status" | "push" => Vec::new(),
207 _ => Vec::new(),
208 };
209
210 let data = apply_common_filters(data, filters);
211
212 QueryResponse {
213 id: String::new(),
214 ok: true,
215 data,
216 session: Some(self.session.clone()),
217 error: None,
218 }
219 }
220
221 fn query_console(&self, filters: &Filters) -> Vec<serde_json::Value> {
222 let buf = match self.console.lock() {
223 Ok(b) => b,
224 Err(_) => return Vec::new(),
225 };
226 buf.filter(|v| {
227 if let Some(ref level) = filters.level
228 && v.get("level").and_then(|l| l.as_str()) != Some(level.as_str())
229 {
230 return false;
231 }
232 if let Some(ref source) = filters.source
233 && v.get("source").and_then(|s| s.as_str()) != Some(source.as_str())
234 {
235 return false;
236 }
237 true
238 })
239 }
240
241 fn query_errors(&self, filters: &Filters) -> Vec<serde_json::Value> {
242 let buf = match self.errors.lock() {
243 Ok(b) => b,
244 Err(_) => return Vec::new(),
245 };
246 buf.filter(|v| {
247 if let Some(ref source) = filters.source
248 && v.get("source").and_then(|s| s.as_str()) != Some(source.as_str())
249 {
250 return false;
251 }
252 true
253 })
254 }
255
256 fn query_network(&self, filters: &Filters) -> Vec<serde_json::Value> {
257 let buf = match self.network.lock() {
258 Ok(b) => b,
259 Err(_) => return Vec::new(),
260 };
261 buf.filter(|v| {
262 if let Some(status) = filters.status
263 && v.get("status").and_then(|s| s.as_f64()) != Some(status)
264 {
265 return false;
266 }
267 if let Some(failed) = filters.failed
268 && v.get("failed").and_then(|f| f.as_bool()) != Some(failed)
269 {
270 return false;
271 }
272 true
273 })
274 }
275
276 fn query_app(&self) -> Vec<serde_json::Value> {
277 match self.app.lock() {
278 Ok(buf) => buf.to_vec(),
279 Err(_) => Vec::new(),
280 }
281 }
282
283 fn query_all(&self, filters: &Filters) -> Vec<serde_json::Value> {
284 let mut all = Vec::new();
285 all.extend(self.query_console(filters));
286 all.extend(self.query_errors(filters));
287 all.extend(self.query_network(filters));
288 all.extend(self.query_app());
289
290 all.sort_by(|a, b| {
291 let ta = a.get("timestamp").and_then(|t| t.as_i64()).unwrap_or(0);
292 let tb = b.get("timestamp").and_then(|t| t.as_i64()).unwrap_or(0);
293 ta.cmp(&tb)
294 });
295 all
296 }
297}
298
299fn apply_common_filters(
301 mut entries: Vec<serde_json::Value>,
302 filters: &Filters,
303) -> Vec<serde_json::Value> {
304 if let Some(ref id) = filters.id {
306 for entry in &entries {
307 if entry.get("id").and_then(|i| i.as_str()) == Some(id.as_str()) {
308 return vec![entry.clone()];
309 }
310 }
311 return Vec::new();
312 }
313
314 if let Some(ref ids) = filters.ids {
316 let id_set: std::collections::HashSet<&str> = ids.iter().map(|s| s.as_str()).collect();
317 entries.retain(|e| {
318 e.get("id")
319 .and_then(|i| i.as_str())
320 .is_some_and(|id| id_set.contains(id))
321 });
322 }
323
324 if let Some(last) = filters.last {
326 let cutoff = now_millis() - last as i64;
327 entries.retain(|e| {
328 e.get("timestamp")
329 .and_then(|t| t.as_i64())
330 .is_some_and(|ts| ts >= cutoff)
331 });
332 }
333
334 if let Some(limit) = filters.limit
336 && limit < entries.len()
337 {
338 entries = entries.split_off(entries.len() - limit);
339 }
340
341 entries
342}
343
344#[cfg(test)]
345mod tests {
346 use super::*;
347 use crate::protocol::create_session;
348
349 #[test]
350 fn ring_buffer_basic() {
351 let mut rb: RingBuffer<i32> = RingBuffer::new(3);
352 assert!(rb.is_empty());
353 rb.push(1);
354 rb.push(2);
355 rb.push(3);
356 assert_eq!(rb.len(), 3);
357 assert_eq!(rb.to_vec(), vec![1, 2, 3]);
358 }
359
360 #[test]
361 fn ring_buffer_overflow() {
362 let mut rb: RingBuffer<i32> = RingBuffer::new(3);
363 rb.push(1);
364 rb.push(2);
365 rb.push(3);
366 rb.push(4);
367 assert_eq!(rb.len(), 3);
368 assert_eq!(rb.to_vec(), vec![2, 3, 4]);
369 }
370
371 #[test]
372 fn ring_buffer_filter() {
373 let mut rb: RingBuffer<i32> = RingBuffer::new(5);
374 rb.push(1);
375 rb.push(2);
376 rb.push(3);
377 rb.push(4);
378 assert_eq!(rb.filter(|x| x % 2 == 0), vec![2, 4]);
379 }
380
381 #[test]
382 fn ring_buffer_clear() {
383 let mut rb: RingBuffer<i32> = RingBuffer::new(3);
384 rb.push(1);
385 rb.push(2);
386 rb.clear();
387 assert!(rb.is_empty());
388 assert_eq!(rb.to_vec(), Vec::<i32>::new());
389 }
390
391 #[test]
392 fn store_push_and_query_console() {
393 let session = create_session("test", 3000);
394 let store = LogStore::new(session);
395
396 store.push_console("info", vec![serde_json::json!("hello")], "server");
397 store.push_console("error", vec![serde_json::json!("bad")], "server");
398
399 let resp = store.query("console", &Filters::default());
400 assert!(resp.ok);
401 assert_eq!(resp.data.len(), 2);
402
403 let resp = store.query(
404 "console",
405 &Filters {
406 level: Some("error".to_string()),
407 ..Default::default()
408 },
409 );
410 assert_eq!(resp.data.len(), 1);
411 }
412
413 #[test]
414 fn store_query_all() {
415 let session = create_session("test", 3000);
416 let store = LogStore::new(session);
417
418 store.push_console("info", vec![serde_json::json!("hello")], "server");
419 store.push_error("oops", None, "server");
420
421 let resp = store.query("all", &Filters::default());
422 assert!(resp.ok);
423 assert_eq!(resp.data.len(), 2);
424 }
425
426 #[test]
427 fn store_query_with_limit() {
428 let session = create_session("test", 3000);
429 let store = LogStore::new(session);
430
431 for i in 0..10 {
432 store.push_console(
433 "info",
434 vec![serde_json::json!(format!("msg {i}"))],
435 "server",
436 );
437 }
438
439 let resp = store.query(
440 "console",
441 &Filters {
442 limit: Some(3),
443 ..Default::default()
444 },
445 );
446 assert_eq!(resp.data.len(), 3);
447 }
448}