1use crate::commands::{Command, CommandResult, OutputFormat};
4use crate::config::Config;
5use crate::errors::{CliError, prelude::Result as CliResult};
6use async_trait::async_trait;
7use clap::Subcommand;
8use std::sync::Arc;
9use std::time::Instant;
10use tokio::signal;
11use tracing::{debug, error, info, warn};
12use vkteams_bot::prelude::{Bot, ResponseEventsGet};
13#[cfg(feature = "storage")]
14use vkteams_bot::storage::StorageManager;
15
16#[derive(Subcommand, Debug, Clone)]
17pub enum DaemonCommands {
18 #[command(name = "start")]
20 Start {
21 #[arg(short, long)]
23 foreground: bool,
24
25 #[arg(long, value_name = "PATH")]
27 pid_file: Option<String>,
28
29 #[arg(long)]
31 auto_save: bool,
32
33 #[arg(long)]
35 chat_id: Option<String>,
36 },
37
38 #[command(name = "stop")]
40 Stop {
41 #[arg(long, value_name = "PATH")]
43 pid_file: Option<String>,
44 },
45
46 #[command(name = "status")]
48 Status {
49 #[arg(long, value_name = "PATH")]
51 pid_file: Option<String>,
52 },
53}
54
55#[async_trait]
56impl Command for DaemonCommands {
57 async fn execute(&self, bot: &Bot) -> CliResult<()> {
58 match self {
59 DaemonCommands::Start {
60 foreground,
61 auto_save,
62 ..
63 } => {
64 if *foreground {
65 start_foreground_daemon(bot, *auto_save).await
66 } else {
67 start_background_daemon(bot, *auto_save).await
68 }
69 }
70 DaemonCommands::Stop { .. } => stop_daemon().await,
71 DaemonCommands::Status { .. } => check_daemon_status().await,
72 }
73 }
74
75 async fn execute_with_output(&self, bot: &Bot, format: &OutputFormat) -> CliResult<()> {
76 let result = match self {
77 DaemonCommands::Start { .. } => {
78 self.execute(bot).await?;
79 CommandResult::success_with_message("Daemon started successfully")
80 }
81 DaemonCommands::Stop { .. } => {
82 self.execute(bot).await?;
83 CommandResult::success_with_message("Daemon stopped successfully")
84 }
85 DaemonCommands::Status { pid_file } => {
86 match get_daemon_status(pid_file.as_deref()).await {
87 Ok(status) => CommandResult::success_with_data(status),
88 Err(e) => CommandResult::error(format!("Failed to get daemon status: {e}")),
89 }
90 }
91 };
92
93 result.display(format)
94 }
95
96 fn name(&self) -> &'static str {
97 "daemon"
98 }
99}
100
101pub struct AutoSaveEventProcessor {
103 #[cfg(feature = "storage")]
104 storage: Option<Arc<StorageManager>>,
105 stats: Arc<ProcessorStats>,
106}
107
108pub struct ProcessorStats {
109 events_processed: std::sync::atomic::AtomicU64,
110 events_saved: std::sync::atomic::AtomicU64,
111 events_failed: std::sync::atomic::AtomicU64,
112 last_processed_time: std::sync::Mutex<Option<chrono::DateTime<chrono::Utc>>>,
113 start_time: std::sync::Mutex<chrono::DateTime<chrono::Utc>>,
114 bytes_processed: std::sync::atomic::AtomicU64,
115}
116
117impl Default for ProcessorStats {
118 fn default() -> Self {
119 Self {
120 events_processed: std::sync::atomic::AtomicU64::new(0),
121 events_saved: std::sync::atomic::AtomicU64::new(0),
122 events_failed: std::sync::atomic::AtomicU64::new(0),
123 last_processed_time: std::sync::Mutex::new(None),
124 start_time: std::sync::Mutex::new(chrono::Utc::now()),
125 bytes_processed: std::sync::atomic::AtomicU64::new(0),
126 }
127 }
128}
129
130#[derive(Debug, Clone, serde::Serialize)]
131pub struct ProcessorStatsSnapshot {
132 pub events_processed: u64,
133 pub events_saved: u64,
134 pub events_failed: u64,
135 pub last_processed_time: Option<chrono::DateTime<chrono::Utc>>,
136 pub start_time: chrono::DateTime<chrono::Utc>,
137 pub uptime_seconds: i64,
138 pub bytes_processed: u64,
139 pub events_per_second: f64,
140}
141
142impl AutoSaveEventProcessor {
143 pub async fn new(_config: &Config) -> CliResult<Self> {
144 #[cfg(feature = "storage")]
145 let storage = {
146 let storage_config = vkteams_bot::storage::StorageConfig::default();
149
150 match StorageManager::new(&storage_config).await {
151 Ok(manager) => {
152 if let Err(e) = manager.initialize().await {
154 warn!("Failed to initialize storage: {}", e);
155 None
156 } else {
157 info!("Storage manager initialized successfully");
158 Some(Arc::new(manager))
159 }
160 }
161 Err(e) => {
162 warn!("Failed to create storage manager: {}", e);
163 None
164 }
165 }
166 };
167
168 Ok(Self {
169 #[cfg(feature = "storage")]
170 storage,
171 stats: Arc::new(ProcessorStats::default()),
172 })
173 }
174
175 pub async fn process_events(&self, _bot: Bot, events: ResponseEventsGet) -> CliResult<()> {
177 let event_count = events.events.len();
178 if event_count == 0 {
179 return Ok(());
180 }
181
182 debug!("Auto-saving {} events to storage", event_count);
183
184 let start_time = Instant::now();
185 let mut saved_count = 0;
186 let mut failed_count = 0;
187 let mut total_bytes = 0;
188
189 #[cfg(feature = "storage")]
190 {
191 if let Some(storage) = &self.storage {
192 for event in events.events {
194 debug!(
195 "Processing event: {} (type: {:?})",
196 event.event_id, event.event_type
197 );
198
199 if let Ok(serialized) = serde_json::to_vec(&event) {
201 total_bytes += serialized.len();
202 }
203
204 match storage.process_event(&event).await {
206 Ok(event_id) => {
207 debug!(
208 "Successfully stored event {} with ID {}",
209 event.event_id, event_id
210 );
211 saved_count += 1;
212 }
213 Err(e) => {
214 error!("Failed to store event {}: {}", event.event_id, e);
215 failed_count += 1;
216 }
217 }
218 }
219 } else {
220 for event in events.events {
222 debug!(
223 "Processing event: {} (type: {:?}) - no storage available",
224 event.event_id, event.event_type
225 );
226 saved_count += 1;
227 }
228 }
229 }
230
231 #[cfg(not(feature = "storage"))]
232 {
233 for event in events.events {
235 debug!(
236 "Processing event: {} (type: {:?}) - storage not enabled",
237 event.event_id, event.event_type
238 );
239 saved_count += 1;
240 }
241 }
242
243 let duration = start_time.elapsed();
244
245 self.stats
247 .events_processed
248 .fetch_add(event_count as u64, std::sync::atomic::Ordering::Relaxed);
249 self.stats
250 .events_saved
251 .fetch_add(saved_count, std::sync::atomic::Ordering::Relaxed);
252 self.stats
253 .events_failed
254 .fetch_add(failed_count, std::sync::atomic::Ordering::Relaxed);
255 self.stats
256 .bytes_processed
257 .fetch_add(total_bytes as u64, std::sync::atomic::Ordering::Relaxed);
258
259 if let Ok(mut last_time) = self.stats.last_processed_time.lock() {
260 *last_time = Some(chrono::Utc::now());
261 }
262
263 info!(
264 "Processed {} events in {:?}: {} saved, {} failed, {} bytes processed",
265 event_count, duration, saved_count, failed_count, total_bytes
266 );
267
268 if failed_count > 0 {
269 warn!(
270 "{} events failed to save - check storage connection",
271 failed_count
272 );
273 }
274
275 Ok(())
276 }
277
278 pub fn get_stats(&self) -> ProcessorStatsSnapshot {
280 let start_time = *self.stats.start_time.lock().unwrap();
281 let now = chrono::Utc::now();
282 let uptime_seconds = (now - start_time).num_seconds();
283 let events_processed = self
284 .stats
285 .events_processed
286 .load(std::sync::atomic::Ordering::Relaxed);
287
288 ProcessorStatsSnapshot {
289 events_processed,
290 events_saved: self
291 .stats
292 .events_saved
293 .load(std::sync::atomic::Ordering::Relaxed),
294 events_failed: self
295 .stats
296 .events_failed
297 .load(std::sync::atomic::Ordering::Relaxed),
298 last_processed_time: *self.stats.last_processed_time.lock().unwrap(),
299 start_time,
300 uptime_seconds,
301 bytes_processed: self
302 .stats
303 .bytes_processed
304 .load(std::sync::atomic::Ordering::Relaxed),
305 events_per_second: if uptime_seconds > 0 {
306 events_processed as f64 / uptime_seconds as f64
307 } else {
308 0.0
309 },
310 }
311 }
312}
313
314async fn start_foreground_daemon(bot: &Bot, auto_save: bool) -> CliResult<()> {
316 info!(
317 "Starting VKTeams Bot daemon in foreground mode with auto_save={}",
318 auto_save
319 );
320
321 let processor = if auto_save {
322 let config = crate::config::UnifiedConfigAdapter::load()
324 .map_err(|e| crate::errors::prelude::CliError::Config(e.to_string()))?;
325 Some(Arc::new(AutoSaveEventProcessor::new(&config).await?))
326 } else {
327 None
328 };
329
330 let shutdown_signal = setup_shutdown_signal();
332
333 let event_processor = {
335 let processor_clone = processor.clone();
336 move |bot: Bot, events: ResponseEventsGet| {
337 let processor_inner = processor_clone.clone();
338 async move {
339 let result = if let Some(processor) = processor_inner {
340 processor.process_events(bot, events).await
341 } else {
342 debug!(
343 "Received {} events (auto-save disabled)",
344 events.events.len()
345 );
346 Ok(())
347 };
348
349 result.map_err(|e| vkteams_bot::error::BotError::System(e.to_string()))
351 }
352 }
353 };
354
355 tokio::select! {
356 result = bot.event_listener(event_processor) => {
357 match result {
358 Ok(_) => info!("Event listener finished successfully"),
359 Err(e) => error!("Event listener error: {}", e),
360 }
361 }
362 _ = shutdown_signal => {
363 info!("Received shutdown signal, stopping daemon...");
364 }
365 }
366
367 Ok(())
368}
369
370async fn start_background_daemon(_bot: &Bot, _auto_save: bool) -> CliResult<()> {
372 Err(crate::errors::prelude::CliError::UnexpectedError(
374 "Background daemon mode not yet implemented. Use --foreground flag.".to_string(),
375 ))
376}
377
378async fn stop_daemon() -> CliResult<()> {
380 Err(crate::errors::prelude::CliError::UnexpectedError(
381 "Daemon stop not yet implemented.".to_string(),
382 ))
383}
384
385async fn check_daemon_status() -> CliResult<()> {
387 info!("Daemon status check - not yet implemented");
388 Ok(())
389}
390
391async fn get_daemon_status(pid_file: Option<&str>) -> CliResult<serde_json::Value> {
393 use chrono::{DateTime, Utc};
394 use std::path::PathBuf;
395
396 let pid_file_path = if let Some(path) = pid_file {
398 PathBuf::from(path)
399 } else {
400 let mut data_dir = dirs::data_dir()
402 .ok_or_else(|| CliError::Config("Cannot determine data directory".to_string()))?;
403 data_dir.push("vkteams-bot");
404 data_dir.push("daemon.pid");
405 data_dir
406 };
407
408 if !pid_file_path.exists() {
410 return Ok(serde_json::json!({
411 "status": "not_running",
412 "reason": "No PID file found",
413 "pid_file": pid_file_path
414 }));
415 }
416
417 let pid_content = tokio::fs::read_to_string(&pid_file_path)
419 .await
420 .map_err(|e| CliError::FileError(format!("Failed to read PID file: {e}")))?;
421
422 let lines: Vec<&str> = pid_content.trim().split('\n').collect();
423 if lines.len() < 2 {
424 return Ok(serde_json::json!({
425 "status": "error",
426 "reason": "Invalid PID file format",
427 "pid_file": pid_file_path
428 }));
429 }
430
431 let pid: u32 = lines[0]
432 .parse()
433 .map_err(|_| CliError::InputError("Invalid PID in file".to_string()))?;
434
435 let started_at = DateTime::parse_from_rfc3339(lines[1])
436 .map_err(|_| CliError::InputError("Invalid timestamp in PID file".to_string()))?
437 .with_timezone(&Utc);
438
439 let is_running = is_process_running(pid);
441
442 if is_running {
443 let uptime = Utc::now().signed_duration_since(started_at);
445 let uptime_str = format_duration(uptime);
446
447 let memory_usage = get_process_memory_usage(pid).unwrap_or_else(|| "unknown".to_string());
449
450 Ok(serde_json::json!({
451 "status": "running",
452 "pid": pid,
453 "started_at": started_at.to_rfc3339(),
454 "uptime": uptime_str,
455 "memory_usage": memory_usage,
456 "pid_file": pid_file_path
457 }))
458 } else {
459 Ok(serde_json::json!({
460 "status": "stale",
461 "reason": "PID file exists but process is not running",
462 "pid": pid,
463 "started_at": started_at.to_rfc3339(),
464 "pid_file": pid_file_path
465 }))
466 }
467}
468
469async fn setup_shutdown_signal() {
471 #[cfg(unix)]
472 {
473 use tokio::signal::unix::{SignalKind, signal};
474
475 let mut sigterm =
476 signal(SignalKind::terminate()).expect("Failed to create SIGTERM handler");
477 let mut sigint = signal(SignalKind::interrupt()).expect("Failed to create SIGINT handler");
478
479 tokio::select! {
480 _ = sigterm.recv() => debug!("Received SIGTERM"),
481 _ = sigint.recv() => debug!("Received SIGINT"),
482 _ = signal::ctrl_c() => debug!("Received Ctrl+C"),
483 }
484 }
485
486 #[cfg(windows)]
487 {
488 let _ = signal::ctrl_c().await;
489 debug!("Received Ctrl+C");
490 }
491}
492
493fn is_process_running(pid: u32) -> bool {
495 #[cfg(unix)]
496 {
497 use std::process::Command;
498 match Command::new("kill").args(["-0", &pid.to_string()]).output() {
500 Ok(output) => output.status.success(),
501 Err(_) => false,
502 }
503 }
504
505 #[cfg(windows)]
506 {
507 use std::process::Command;
508 match Command::new("tasklist")
510 .args(["/FI", &format!("PID eq {}", pid), "/FO", "CSV"])
511 .output()
512 {
513 Ok(output) => {
514 let output_str = String::from_utf8_lossy(&output.stdout);
515 output_str.lines().count() > 1 }
517 Err(_) => false,
518 }
519 }
520
521 #[cfg(not(any(unix, windows)))]
522 {
523 false }
525}
526
527fn format_duration(duration: chrono::Duration) -> String {
529 let seconds = duration.num_seconds();
530 let minutes = seconds / 60;
531 let hours = minutes / 60;
532 let days = hours / 24;
533
534 if days > 0 {
535 format!("{}d {}h {}m", days, hours % 24, minutes % 60)
536 } else if hours > 0 {
537 format!("{}h {}m", hours, minutes % 60)
538 } else if minutes > 0 {
539 format!("{minutes}m")
540 } else {
541 format!("{seconds}s")
542 }
543}
544
545fn get_process_memory_usage(pid: u32) -> Option<String> {
547 #[cfg(unix)]
548 {
549 use std::process::Command;
550 match Command::new("ps")
551 .args(["-p", &pid.to_string(), "-o", "rss="])
552 .output()
553 {
554 Ok(output) => {
555 let rss_str = String::from_utf8_lossy(&output.stdout);
556 if let Ok(rss_kb) = rss_str.trim().parse::<u64>() {
557 let rss_mb = rss_kb / 1024;
558 Some(format!("{rss_mb}MB"))
559 } else {
560 None
561 }
562 }
563 Err(_) => None,
564 }
565 }
566
567 #[cfg(windows)]
568 {
569 use std::process::Command;
570 match Command::new("tasklist")
571 .args(["/FI", &format!("PID eq {}", pid), "/FO", "CSV", "/NH"])
572 .output()
573 {
574 Ok(output) => {
575 let output_str = String::from_utf8_lossy(&output.stdout);
576 if let Some(line) = output_str.lines().next() {
578 let fields: Vec<&str> = line.split(',').collect();
579 if fields.len() > 4 {
580 if let Some(mem_field) = fields.get(4) {
582 return Some(mem_field.trim_matches('"').to_string());
583 }
584 }
585 }
586 None
587 }
588 Err(_) => None,
589 }
590 }
591
592 #[cfg(not(any(unix, windows)))]
593 {
594 None
595 }
596}
597
598#[cfg(test)]
599mod tests {
600 use super::*;
601 use chrono::{Duration, Utc};
602 use std::sync::atomic::Ordering;
603 use tokio;
604 use vkteams_bot::prelude::{EventMessage, EventType};
605
606 #[test]
607 fn test_processor_stats() {
608 let processor = AutoSaveEventProcessor {
609 #[cfg(feature = "storage")]
610 storage: None,
611 stats: Arc::new(ProcessorStats::default()),
612 };
613
614 let stats = processor.get_stats();
616 assert_eq!(stats.events_processed, 0);
617 assert_eq!(stats.events_saved, 0);
618 assert_eq!(stats.events_failed, 0);
619 assert!(stats.last_processed_time.is_none());
620
621 processor
623 .stats
624 .events_processed
625 .store(100, Ordering::Relaxed);
626 processor.stats.events_saved.store(95, Ordering::Relaxed);
627 processor.stats.events_failed.store(5, Ordering::Relaxed);
628
629 let updated_stats = processor.get_stats();
630 assert_eq!(updated_stats.events_processed, 100);
631 assert_eq!(updated_stats.events_saved, 95);
632 assert_eq!(updated_stats.events_failed, 5);
633 assert!(updated_stats.uptime_seconds >= 0);
634 }
635
636 #[test]
637 fn test_daemon_command_name() {
638 let cmd = DaemonCommands::Status { pid_file: None };
639 assert_eq!(cmd.name(), "daemon");
640 }
641
642 #[test]
643 fn test_daemon_commands_variants() {
644 let start_cmd = DaemonCommands::Start {
646 foreground: true,
647 pid_file: Some("/tmp/test.pid".to_string()),
648 auto_save: true,
649 chat_id: Some("test-chat".to_string()),
650 };
651 assert_eq!(start_cmd.name(), "daemon");
652
653 let stop_cmd = DaemonCommands::Stop {
655 pid_file: Some("/tmp/test.pid".to_string()),
656 };
657 assert_eq!(stop_cmd.name(), "daemon");
658
659 let status_cmd = DaemonCommands::Status { pid_file: None };
661 assert_eq!(status_cmd.name(), "daemon");
662 }
663
664 #[test]
665 fn test_processor_stats_default() {
666 let stats = ProcessorStats::default();
667 assert_eq!(stats.events_processed.load(Ordering::Relaxed), 0);
668 assert_eq!(stats.events_saved.load(Ordering::Relaxed), 0);
669 assert_eq!(stats.events_failed.load(Ordering::Relaxed), 0);
670 assert_eq!(stats.bytes_processed.load(Ordering::Relaxed), 0);
671 assert!(stats.last_processed_time.lock().unwrap().is_none());
672 assert!(
673 Utc::now()
674 .signed_duration_since(*stats.start_time.lock().unwrap())
675 .num_seconds()
676 >= 0
677 );
678 }
679
680 #[test]
681 fn test_processor_stats_snapshot_creation() {
682 let processor = AutoSaveEventProcessor {
683 #[cfg(feature = "storage")]
684 storage: None,
685 stats: Arc::new(ProcessorStats::default()),
686 };
687
688 processor
690 .stats
691 .events_processed
692 .store(50, Ordering::Relaxed);
693 processor.stats.events_saved.store(45, Ordering::Relaxed);
694 processor.stats.events_failed.store(5, Ordering::Relaxed);
695 processor
696 .stats
697 .bytes_processed
698 .store(1024, Ordering::Relaxed);
699
700 if let Ok(mut last_time) = processor.stats.last_processed_time.lock() {
702 *last_time = Some(Utc::now());
703 }
704
705 let snapshot = processor.get_stats();
706 assert_eq!(snapshot.events_processed, 50);
707 assert_eq!(snapshot.events_saved, 45);
708 assert_eq!(snapshot.events_failed, 5);
709 assert_eq!(snapshot.bytes_processed, 1024);
710 assert!(snapshot.last_processed_time.is_some());
711 assert!(snapshot.uptime_seconds >= 0);
712 assert!(snapshot.events_per_second >= 0.0);
713 }
714
715 #[tokio::test]
716 async fn test_process_events_empty() {
717 let processor = AutoSaveEventProcessor {
718 #[cfg(feature = "storage")]
719 storage: None,
720 stats: Arc::new(ProcessorStats::default()),
721 };
722
723 let events = ResponseEventsGet { events: vec![] };
724
725 let bot = vkteams_bot::Bot::with_params(
726 &vkteams_bot::prelude::APIVersionUrl::V1,
727 "test_token",
728 "https://test.api.url",
729 )
730 .unwrap();
731 let result = processor.process_events(bot, events).await;
732 assert!(result.is_ok());
733
734 let stats = processor.get_stats();
735 assert_eq!(stats.events_processed, 0);
736 }
737
738 #[tokio::test]
739 async fn test_process_events_with_events() {
740 let processor = AutoSaveEventProcessor {
741 #[cfg(feature = "storage")]
742 storage: None,
743 stats: Arc::new(ProcessorStats::default()),
744 };
745
746 use vkteams_bot::prelude::{
747 Chat, ChatId, EventPayloadEditedMessage, EventPayloadNewMessage, From, MsgId,
748 Timestamp, UserId,
749 };
750
751 let test_chat = Chat {
752 chat_id: ChatId::from("test_chat"),
753 chat_type: "private".to_string(),
754 title: Some("Test Chat".to_string()),
755 };
756
757 let test_from = From {
758 user_id: UserId("test_user".to_string()),
759 first_name: "Test".to_string(),
760 last_name: Some("User".to_string()),
761 };
762
763 let events = ResponseEventsGet {
764 events: vec![
765 EventMessage {
766 event_id: 1,
767 event_type: EventType::NewMessage(Box::new(EventPayloadNewMessage {
768 msg_id: MsgId("test_msg".to_string()),
769 text: "test message".to_string(),
770 chat: test_chat.clone(),
771 from: test_from.clone(),
772 format: None,
773 parts: vec![],
774 timestamp: Timestamp(1234567890),
775 })),
776 },
777 EventMessage {
778 event_id: 2,
779 event_type: EventType::EditedMessage(Box::new(EventPayloadEditedMessage {
780 msg_id: MsgId("test_msg_2".to_string()),
781 text: "edited message".to_string(),
782 timestamp: Timestamp(1234567890),
783 chat: test_chat,
784 from: test_from,
785 format: None,
786 edited_timestamp: Timestamp(1234567900),
787 })),
788 },
789 ],
790 };
791
792 let bot = vkteams_bot::Bot::with_params(
793 &vkteams_bot::prelude::APIVersionUrl::V1,
794 "test_token",
795 "https://test.api.url",
796 )
797 .unwrap();
798 let result = processor.process_events(bot, events).await;
799 assert!(result.is_ok());
800
801 let stats = processor.get_stats();
802 assert_eq!(stats.events_processed, 2);
803 }
804
805 #[tokio::test]
806 async fn test_start_background_daemon_error() {
807 let bot = vkteams_bot::Bot::with_params(
808 &vkteams_bot::prelude::APIVersionUrl::V1,
809 "test_token",
810 "https://test.api.url",
811 )
812 .unwrap();
813 let result = start_background_daemon(&bot, false).await;
814 assert!(result.is_err());
815 match result {
816 Err(CliError::UnexpectedError(msg)) => {
817 assert!(msg.contains("Background daemon mode not yet implemented"));
818 }
819 _ => panic!("Expected UnexpectedError"),
820 }
821 }
822
823 #[tokio::test]
824 async fn test_stop_daemon_error() {
825 let result = stop_daemon().await;
826 assert!(result.is_err());
827 match result {
828 Err(CliError::UnexpectedError(msg)) => {
829 assert!(msg.contains("Daemon stop not yet implemented"));
830 }
831 _ => panic!("Expected UnexpectedError"),
832 }
833 }
834
835 #[tokio::test]
836 async fn test_check_daemon_status_success() {
837 let result = check_daemon_status().await;
838 assert!(result.is_ok());
839 }
840
841 #[test]
842 fn test_format_duration() {
843 let duration = Duration::seconds(30);
845 assert_eq!(format_duration(duration), "30s");
846
847 let duration = Duration::minutes(5);
849 assert_eq!(format_duration(duration), "5m");
850
851 let duration = Duration::hours(2);
853 assert_eq!(format_duration(duration), "2h 0m");
854
855 let duration = Duration::days(1) + Duration::hours(3) + Duration::minutes(15);
857 assert_eq!(format_duration(duration), "1d 3h 15m");
858
859 let duration = Duration::days(10) + Duration::hours(5) + Duration::minutes(30);
861 assert_eq!(format_duration(duration), "10d 5h 30m");
862 }
863
864 #[test]
865 fn test_is_process_running() {
866 let current_pid = std::process::id();
868 assert!(is_process_running(current_pid));
869
870 assert!(!is_process_running(999999));
872 }
873
874 #[test]
875 fn test_get_process_memory_usage() {
876 #[cfg(unix)]
878 {
879 let memory = get_process_memory_usage(1);
880 assert!(memory.is_some() || memory.is_none());
882 }
883
884 let memory = get_process_memory_usage(999999);
886 assert!(memory.is_none());
887 }
888
889 #[tokio::test]
890 async fn test_get_daemon_status_no_pid_file() {
891 let result = get_daemon_status(Some("/nonexistent/path/daemon.pid")).await;
892 assert!(result.is_ok());
893
894 let status = result.unwrap();
895 assert_eq!(status["status"], "not_running");
896 assert_eq!(status["reason"], "No PID file found");
897 }
898
899 #[tokio::test]
900 async fn test_autosave_processor_new() {
901 let config = Config::default();
902 let result = AutoSaveEventProcessor::new(&config).await;
903 assert!(result.is_ok());
904
905 let processor = result.unwrap();
906 let stats = processor.get_stats();
907 assert_eq!(stats.events_processed, 0);
908 assert_eq!(stats.events_saved, 0);
909 assert_eq!(stats.events_failed, 0);
910 }
911
912 #[test]
913 fn test_processor_stats_with_events_per_second() {
914 let processor = AutoSaveEventProcessor {
915 #[cfg(feature = "storage")]
916 storage: None,
917 stats: Arc::new(ProcessorStats::default()),
918 };
919
920 std::thread::sleep(std::time::Duration::from_millis(100));
922
923 processor
924 .stats
925 .events_processed
926 .store(10, Ordering::Relaxed);
927
928 let stats = processor.get_stats();
929 assert_eq!(stats.events_processed, 10);
930 assert!(stats.uptime_seconds >= 0);
931
932 if stats.uptime_seconds > 0 {
934 assert!(stats.events_per_second >= 0.0);
935 } else {
936 assert_eq!(stats.events_per_second, 0.0);
937 }
938 }
939}