tether_utils/tether_topics/
insights.rs

1use circular_buffer::CircularBuffer;
2use tether_agent::{three_part_topic::TetherOrCustomTopic, PlugOptionsBuilder, TetherAgent};
3
4use crate::tether_topics::{agent_tree::AgentTree, sampler::Sampler};
5use std::{
6    fmt,
7    time::{Duration, SystemTime},
8};
9
10use super::{parse_agent_id, parse_agent_role, parse_plug_name, TopicOptions};
11pub const MONITOR_LOG_LENGTH: usize = 256;
12
13/// Topic, Payload as JSON
14type MessageLogEntry = (String, String);
15
16pub struct Insights {
17    topics: Vec<String>,
18    roles: Vec<String>,
19    ids: Vec<String>,
20    plugs: Vec<String>,
21    trees: Vec<AgentTree>,
22    message_count: u128,
23    log_start: Option<SystemTime>,
24    message_log: CircularBuffer<MONITOR_LOG_LENGTH, MessageLogEntry>,
25    sampler: Sampler,
26}
27
28impl fmt::Display for Insights {
29    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
30        let topics = format!("x{} Topics: {:?} \n\n", self.topics().len(), self.topics());
31        let roles = format!("x{} Roles: {:?} \n", self.roles().len(), self.roles());
32        let ids = format!("x{} IDs: {:?} \n", self.ids().len(), self.ids());
33        let plugs = format!("x{} Plugs: {:?} \n", self.plugs().len(), self.plugs());
34
35        let trees_formatted = self.trees.iter().map(|x| x.to_string()).collect::<String>();
36
37        write!(f, "{}{}{}{}{}", topics, roles, ids, plugs, trees_formatted)
38    }
39}
40
41impl Insights {
42    pub fn new(options: &TopicOptions, tether_agent: &mut TetherAgent) -> Self {
43        if !tether_agent.is_connected() {
44            panic!("Insights utility needs already-connected Tether Agent");
45        }
46        let _input_plug = PlugOptionsBuilder::create_input("monitor")
47            .topic(Some(options.topic.clone()).as_deref())
48            .build(tether_agent)
49            .expect("failed to connect Tether");
50
51        Insights {
52            topics: Vec::new(),
53            roles: Vec::new(),
54            ids: Vec::new(),
55            plugs: Vec::new(),
56            trees: Vec::new(),
57            message_log: CircularBuffer::new(),
58            message_count: 0,
59            log_start: None,
60            sampler: Sampler::new(options.sampler_interval),
61        }
62    }
63
64    pub fn sample(&mut self) -> bool {
65        self.sampler.add_sample(self.message_count)
66    }
67
68    pub fn sampler(&self) -> &Sampler {
69        &self.sampler
70    }
71
72    pub fn update(&mut self, topic: &TetherOrCustomTopic, payload: Vec<u8>) -> bool {
73        self.message_count += 1;
74
75        if self.log_start.is_none() {
76            self.log_start = Some(SystemTime::now());
77        }
78
79        let full_topic_string = topic.full_topic_string();
80
81        if payload.is_empty() {
82            self.message_log
83                .push_back((String::from(&full_topic_string), "[EMPTY_MESSAGE]".into()));
84        } else {
85            let value: rmpv::Value =
86                rmp_serde::from_slice(&payload).expect("failed to decode msgpack");
87            let json = serde_json::to_string(&value).expect("failed to stringify JSON");
88            self.message_log
89                .push_back((String::from(&full_topic_string), json));
90        }
91
92        let mut did_change = false;
93
94        // Collect some stats...
95        if add_if_unique(&full_topic_string, &mut self.topics) {
96            did_change = true;
97        }
98        if add_if_unique(
99            parse_agent_role(&full_topic_string).unwrap_or("unknown"),
100            &mut self.roles,
101        ) {
102            did_change = true;
103        }
104        if add_if_unique(
105            parse_agent_id(&full_topic_string).unwrap_or("unknown"),
106            &mut self.ids,
107        ) {
108            did_change = true;
109        }
110        if add_if_unique(
111            parse_plug_name(&full_topic_string).unwrap_or("unknown"),
112            &mut self.plugs,
113        ) {
114            did_change = true;
115        }
116
117        if did_change {
118            self.trees = self
119                .roles()
120                .iter()
121                .map(|role| AgentTree::new(role.as_str(), self.topics()))
122                .collect::<Vec<AgentTree>>();
123        }
124
125        did_change
126    }
127
128    pub fn topics(&self) -> &[String] {
129        &self.topics
130    }
131
132    pub fn roles(&self) -> &[String] {
133        &self.roles
134    }
135    pub fn ids(&self) -> &[String] {
136        &self.ids
137    }
138    pub fn plugs(&self) -> &[String] {
139        &self.plugs
140    }
141    pub fn trees(&self) -> &[AgentTree] {
142        &self.trees
143    }
144
145    pub fn message_count(&self) -> u128 {
146        self.message_count
147    }
148
149    pub fn message_log(&self) -> &CircularBuffer<MONITOR_LOG_LENGTH, MessageLogEntry> {
150        &self.message_log
151    }
152    pub fn since_log_start(&self) -> Option<Duration> {
153        self.log_start
154            .map(|t| t.elapsed().unwrap_or(Duration::ZERO))
155    }
156
157    /// Messages per second, since log_start was (re)set
158    pub fn get_rate(&self) -> Option<f32> {
159        match self.log_start {
160            Some(t) => {
161                if let Ok(elapsed) = t.elapsed() {
162                    Some(self.message_count as f32 / elapsed.as_secs_f32())
163                } else {
164                    None
165                }
166            }
167            None => None,
168        }
169    }
170}
171
172fn add_if_unique(item: &str, list: &mut Vec<String>) -> bool {
173    if !list.iter().any(|i| i.eq(item)) {
174        list.push(String::from(item));
175        true
176    } else {
177        false
178    }
179}