mecha10_core/
topic_utils.rs

1//! Topic utility functions
2//!
3//! Provides common utilities for working with topics including:
4//! - Topic path parsing and categorization
5//! - Time formatting for topic metadata
6//! - Topic metadata extraction from Redis streams
7
8use std::time::SystemTime;
9
10/// Extract category from topic path (first segment)
11///
12/// # Examples
13/// ```
14/// use mecha10_core::topic_utils::extract_category;
15///
16/// assert_eq!(extract_category("/sensor/camera"), "/sensor");
17/// assert_eq!(extract_category("/actuator/motor/left"), "/actuator");
18/// assert_eq!(extract_category("unknown"), "uncategorized");
19/// ```
20pub fn extract_category(topic: &str) -> String {
21    let parts: Vec<&str> = topic.trim_start_matches('/').split('/').collect();
22    if parts.is_empty() || parts[0].is_empty() {
23        "uncategorized".to_string()
24    } else {
25        format!("/{}", parts[0])
26    }
27}
28
29/// Format elapsed time in human-readable format
30///
31/// # Examples
32/// ```
33/// use mecha10_core::topic_utils::format_elapsed_time;
34///
35/// assert_eq!(format_elapsed_time(500), "500ms ago");
36/// assert_eq!(format_elapsed_time(5000), "5s ago");
37/// assert_eq!(format_elapsed_time(65000), "1m ago");
38/// ```
39pub fn format_elapsed_time(elapsed_ms: u64) -> String {
40    if elapsed_ms < 1000 {
41        format!("{}ms ago", elapsed_ms)
42    } else if elapsed_ms < 60_000 {
43        format!("{}s ago", elapsed_ms / 1000)
44    } else if elapsed_ms < 3_600_000 {
45        format!("{}m ago", elapsed_ms / 60_000)
46    } else if elapsed_ms < 86_400_000 {
47        format!("{}h ago", elapsed_ms / 3_600_000)
48    } else {
49        format!("{}d ago", elapsed_ms / 86_400_000)
50    }
51}
52
53/// Topic metadata extracted from Redis stream info
54#[derive(Debug, Clone)]
55pub struct TopicMetadata {
56    /// Topic path
57    pub topic: String,
58    /// Number of messages in the stream
59    pub message_count: Option<usize>,
60    /// Timestamp of last message (milliseconds since epoch)
61    pub last_update_ms: Option<u64>,
62    /// Timestamp of first message (milliseconds since epoch)
63    pub first_timestamp_ms: Option<u64>,
64    /// Timestamp of last message (milliseconds since epoch)
65    pub last_timestamp_ms: Option<u64>,
66}
67
68impl TopicMetadata {
69    /// Create new topic metadata
70    pub fn new(topic: String) -> Self {
71        Self {
72            topic,
73            message_count: None,
74            last_update_ms: None,
75            first_timestamp_ms: None,
76            last_timestamp_ms: None,
77        }
78    }
79
80    /// Set message count
81    pub fn with_message_count(mut self, count: usize) -> Self {
82        self.message_count = Some(count);
83        self
84    }
85
86    /// Set last update timestamp
87    pub fn with_last_update(mut self, timestamp_ms: u64) -> Self {
88        self.last_update_ms = Some(timestamp_ms);
89        self
90    }
91
92    /// Set timestamp range
93    pub fn with_timestamp_range(mut self, first_ms: u64, last_ms: u64) -> Self {
94        self.first_timestamp_ms = Some(first_ms);
95        self.last_timestamp_ms = Some(last_ms);
96        self
97    }
98
99    /// Calculate elapsed time since last update
100    pub fn elapsed_since_last_update(&self) -> Option<u64> {
101        self.last_update_ms.map(|last_ms| {
102            let now_ms = SystemTime::now()
103                .duration_since(SystemTime::UNIX_EPOCH)
104                .unwrap()
105                .as_millis() as u64;
106            now_ms.saturating_sub(last_ms)
107        })
108    }
109
110    /// Calculate message rate (messages per second)
111    pub fn calculate_rate(&self) -> Option<f64> {
112        if let (Some(first_ms), Some(last_ms), Some(count)) =
113            (self.first_timestamp_ms, self.last_timestamp_ms, self.message_count)
114        {
115            let duration_ms = last_ms.saturating_sub(first_ms);
116            if duration_ms > 1000 && count > 0 {
117                return Some((count as f64 * 1000.0) / duration_ms as f64);
118            }
119        }
120        None
121    }
122
123    /// Get human-readable last update time
124    pub fn last_update_str(&self) -> Option<String> {
125        self.elapsed_since_last_update().map(format_elapsed_time)
126    }
127
128    /// Get topic category
129    pub fn category(&self) -> String {
130        extract_category(&self.topic)
131    }
132}