1use parking_lot::RwLock;
9use serde::{Deserialize, Serialize};
10use std::collections::VecDeque;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
20#[serde(rename_all = "lowercase")]
21pub enum ActivityType {
22 Query,
23 Write,
24 Delete,
25 Config,
26 Node,
27 Auth,
28 System,
29}
30
31impl std::fmt::Display for ActivityType {
32 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33 match self {
34 ActivityType::Query => write!(f, "query"),
35 ActivityType::Write => write!(f, "write"),
36 ActivityType::Delete => write!(f, "delete"),
37 ActivityType::Config => write!(f, "config"),
38 ActivityType::Node => write!(f, "node"),
39 ActivityType::Auth => write!(f, "auth"),
40 ActivityType::System => write!(f, "system"),
41 }
42 }
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct Activity {
48 pub id: String,
49 #[serde(rename = "type")]
50 pub activity_type: ActivityType,
51 pub description: String,
52 pub timestamp: String,
53 pub duration: Option<u64>,
54 pub user: Option<String>,
55 pub source: Option<String>,
56 pub details: Option<serde_json::Value>,
57}
58
59#[derive(Debug, Clone)]
61struct ActivityRecord {
62 activity: Activity,
63 created_at: Instant,
64}
65
66pub struct ActivityLogger {
72 activities: RwLock<VecDeque<ActivityRecord>>,
73 next_id: AtomicU64,
74 max_entries: usize,
75 retention_duration: Duration,
76}
77
78impl ActivityLogger {
79 pub fn new() -> Self {
81 Self {
82 activities: RwLock::new(VecDeque::with_capacity(1000)),
83 next_id: AtomicU64::new(1),
84 max_entries: 1000,
85 retention_duration: Duration::from_secs(24 * 60 * 60), }
87 }
88
89 pub fn log(&self, activity_type: ActivityType, description: &str) -> String {
91 self.log_with_details(activity_type, description, None, None, None, None)
92 }
93
94 pub fn log_query(&self, sql: &str, duration_ms: u64, user: Option<&str>) {
96 self.log_with_details(
97 ActivityType::Query,
98 sql,
99 Some(duration_ms),
100 user,
101 None,
102 None,
103 );
104 }
105
106 pub fn log_write(&self, description: &str, user: Option<&str>) {
108 self.log_with_details(
109 ActivityType::Write,
110 description,
111 None,
112 user,
113 None,
114 None,
115 );
116 }
117
118 pub fn log_config(&self, description: &str, user: Option<&str>) {
120 self.log_with_details(
121 ActivityType::Config,
122 description,
123 None,
124 user,
125 None,
126 None,
127 );
128 }
129
130 pub fn log_node(&self, description: &str) {
132 self.log_with_details(
133 ActivityType::Node,
134 description,
135 None,
136 None,
137 Some("cluster"),
138 None,
139 );
140 }
141
142 pub fn log_auth(&self, description: &str, user: Option<&str>) {
144 self.log_with_details(
145 ActivityType::Auth,
146 description,
147 None,
148 user,
149 None,
150 None,
151 );
152 }
153
154 pub fn log_system(&self, description: &str) {
156 self.log_with_details(
157 ActivityType::System,
158 description,
159 None,
160 None,
161 Some("system"),
162 None,
163 );
164 }
165
166 pub fn log_with_details(
168 &self,
169 activity_type: ActivityType,
170 description: &str,
171 duration: Option<u64>,
172 user: Option<&str>,
173 source: Option<&str>,
174 details: Option<serde_json::Value>,
175 ) -> String {
176 let id = format!("act-{:08}", self.next_id.fetch_add(1, Ordering::SeqCst));
177 let activity = Activity {
178 id: id.clone(),
179 activity_type,
180 description: description.to_string(),
181 timestamp: format_timestamp(now_timestamp()),
182 duration,
183 user: user.map(|s| s.to_string()),
184 source: source.map(|s| s.to_string()),
185 details,
186 };
187
188 let record = ActivityRecord {
189 activity,
190 created_at: Instant::now(),
191 };
192
193 let mut activities = self.activities.write();
194
195 while activities.len() >= self.max_entries {
197 activities.pop_front();
198 }
199
200 activities.push_back(record);
201 id
202 }
203
204 pub fn get_recent(&self, limit: usize) -> Vec<Activity> {
206 self.cleanup_expired();
207
208 let activities = self.activities.read();
209 activities
210 .iter()
211 .rev()
212 .take(limit)
213 .map(|r| r.activity.clone())
214 .collect()
215 }
216
217 pub fn get_by_type(&self, activity_type: ActivityType, limit: usize) -> Vec<Activity> {
219 self.cleanup_expired();
220
221 let activities = self.activities.read();
222 activities
223 .iter()
224 .rev()
225 .filter(|r| r.activity.activity_type == activity_type)
226 .take(limit)
227 .map(|r| r.activity.clone())
228 .collect()
229 }
230
231 pub fn get_by_user(&self, username: &str, limit: usize) -> Vec<Activity> {
233 self.cleanup_expired();
234
235 let activities = self.activities.read();
236 activities
237 .iter()
238 .rev()
239 .filter(|r| r.activity.user.as_deref() == Some(username))
240 .take(limit)
241 .map(|r| r.activity.clone())
242 .collect()
243 }
244
245 pub fn count(&self) -> usize {
247 self.activities.read().len()
248 }
249
250 fn cleanup_expired(&self) {
252 let now = Instant::now();
253 let mut activities = self.activities.write();
254
255 while let Some(front) = activities.front() {
256 if now.duration_since(front.created_at) > self.retention_duration {
257 activities.pop_front();
258 } else {
259 break;
260 }
261 }
262 }
263
264 pub fn clear(&self) {
266 self.activities.write().clear();
267 }
268}
269
270impl Default for ActivityLogger {
271 fn default() -> Self {
272 Self::new()
273 }
274}
275
276fn now_timestamp() -> u64 {
282 SystemTime::now()
283 .duration_since(UNIX_EPOCH)
284 .unwrap_or_default()
285 .as_millis() as u64
286}
287
288fn format_timestamp(timestamp_ms: u64) -> String {
290 let secs = timestamp_ms / 1000;
291 let datetime = UNIX_EPOCH + Duration::from_secs(secs);
292 let duration = datetime.duration_since(UNIX_EPOCH).unwrap_or_default();
293 let total_secs = duration.as_secs();
294
295 let days_since_epoch = total_secs / 86400;
296 let secs_today = total_secs % 86400;
297
298 let hours = secs_today / 3600;
299 let minutes = (secs_today % 3600) / 60;
300 let seconds = secs_today % 60;
301
302 let mut year = 1970u64;
303 let mut remaining_days = days_since_epoch;
304
305 loop {
306 let days_in_year = if is_leap_year(year) { 366 } else { 365 };
307 if remaining_days < days_in_year {
308 break;
309 }
310 remaining_days -= days_in_year;
311 year += 1;
312 }
313
314 let days_in_months: [u64; 12] = if is_leap_year(year) {
315 [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
316 } else {
317 [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
318 };
319
320 let mut month = 1u64;
321 for &days in &days_in_months {
322 if remaining_days < days {
323 break;
324 }
325 remaining_days -= days;
326 month += 1;
327 }
328 let day = remaining_days + 1;
329
330 format!("{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z", year, month, day, hours, minutes, seconds)
331}
332
333fn is_leap_year(year: u64) -> bool {
334 (year % 4 == 0 && year % 100 != 0) || (year % 400 == 0)
335}
336
337#[cfg(test)]
342mod tests {
343 use super::*;
344
345 #[test]
346 fn test_log_activity() {
347 let logger = ActivityLogger::new();
348 let id = logger.log(ActivityType::Query, "SELECT * FROM users");
349 assert!(!id.is_empty());
350 assert_eq!(logger.count(), 1);
351 }
352
353 #[test]
354 fn test_get_recent() {
355 let logger = ActivityLogger::new();
356 logger.log(ActivityType::Query, "Query 1");
357 logger.log(ActivityType::Write, "Write 1");
358 logger.log(ActivityType::Query, "Query 2");
359
360 let recent = logger.get_recent(2);
361 assert_eq!(recent.len(), 2);
362 assert_eq!(recent[0].description, "Query 2");
363 assert_eq!(recent[1].description, "Write 1");
364 }
365
366 #[test]
367 fn test_get_by_type() {
368 let logger = ActivityLogger::new();
369 logger.log(ActivityType::Query, "Query 1");
370 logger.log(ActivityType::Write, "Write 1");
371 logger.log(ActivityType::Query, "Query 2");
372
373 let queries = logger.get_by_type(ActivityType::Query, 10);
374 assert_eq!(queries.len(), 2);
375 }
376
377 #[test]
378 fn test_log_query_with_duration() {
379 let logger = ActivityLogger::new();
380 logger.log_query("SELECT * FROM metrics", 42, Some("admin"));
381
382 let recent = logger.get_recent(1);
383 assert_eq!(recent.len(), 1);
384 assert_eq!(recent[0].duration, Some(42));
385 assert_eq!(recent[0].user, Some("admin".to_string()));
386 }
387
388 #[test]
389 fn test_max_entries() {
390 let logger = ActivityLogger::new();
391
392 for i in 0..1100 {
394 logger.log(ActivityType::Query, &format!("Query {}", i));
395 }
396
397 assert!(logger.count() <= 1000);
399 }
400}