tether_utils/tether_topics/
insights.rs1use circular_buffer::CircularBuffer;
2use tether_agent::{three_part_topic::TetherOrCustomTopic, PlugOptionsBuilder, TetherAgent};
3
4use crate::tether_topics::{agent_tree::AgentTree, sampler::Sampler};
5use std::{
6 fmt,
7 time::{Duration, SystemTime},
8};
9
10use super::{parse_agent_id, parse_agent_role, parse_plug_name, TopicOptions};
11pub const MONITOR_LOG_LENGTH: usize = 256;
12
13type MessageLogEntry = (String, String);
15
16pub struct Insights {
17 topics: Vec<String>,
18 roles: Vec<String>,
19 ids: Vec<String>,
20 plugs: Vec<String>,
21 trees: Vec<AgentTree>,
22 message_count: u128,
23 log_start: Option<SystemTime>,
24 message_log: CircularBuffer<MONITOR_LOG_LENGTH, MessageLogEntry>,
25 sampler: Sampler,
26}
27
28impl fmt::Display for Insights {
29 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
30 let topics = format!("x{} Topics: {:?} \n\n", self.topics().len(), self.topics());
31 let roles = format!("x{} Roles: {:?} \n", self.roles().len(), self.roles());
32 let ids = format!("x{} IDs: {:?} \n", self.ids().len(), self.ids());
33 let plugs = format!("x{} Plugs: {:?} \n", self.plugs().len(), self.plugs());
34
35 let trees_formatted = self.trees.iter().map(|x| x.to_string()).collect::<String>();
36
37 write!(f, "{}{}{}{}{}", topics, roles, ids, plugs, trees_formatted)
38 }
39}
40
41impl Insights {
42 pub fn new(options: &TopicOptions, tether_agent: &mut TetherAgent) -> Self {
43 if !tether_agent.is_connected() {
44 panic!("Insights utility needs already-connected Tether Agent");
45 }
46 let _input_plug = PlugOptionsBuilder::create_input("monitor")
47 .topic(Some(options.topic.clone()).as_deref())
48 .build(tether_agent)
49 .expect("failed to connect Tether");
50
51 Insights {
52 topics: Vec::new(),
53 roles: Vec::new(),
54 ids: Vec::new(),
55 plugs: Vec::new(),
56 trees: Vec::new(),
57 message_log: CircularBuffer::new(),
58 message_count: 0,
59 log_start: None,
60 sampler: Sampler::new(options.sampler_interval),
61 }
62 }
63
64 pub fn sample(&mut self) -> bool {
65 self.sampler.add_sample(self.message_count)
66 }
67
68 pub fn sampler(&self) -> &Sampler {
69 &self.sampler
70 }
71
72 pub fn update(&mut self, topic: &TetherOrCustomTopic, payload: Vec<u8>) -> bool {
73 self.message_count += 1;
74
75 if self.log_start.is_none() {
76 self.log_start = Some(SystemTime::now());
77 }
78
79 let full_topic_string = topic.full_topic_string();
80
81 if payload.is_empty() {
82 self.message_log
83 .push_back((String::from(&full_topic_string), "[EMPTY_MESSAGE]".into()));
84 } else {
85 let value: rmpv::Value =
86 rmp_serde::from_slice(&payload).expect("failed to decode msgpack");
87 let json = serde_json::to_string(&value).expect("failed to stringify JSON");
88 self.message_log
89 .push_back((String::from(&full_topic_string), json));
90 }
91
92 let mut did_change = false;
93
94 if add_if_unique(&full_topic_string, &mut self.topics) {
96 did_change = true;
97 }
98 if add_if_unique(
99 parse_agent_role(&full_topic_string).unwrap_or("unknown"),
100 &mut self.roles,
101 ) {
102 did_change = true;
103 }
104 if add_if_unique(
105 parse_agent_id(&full_topic_string).unwrap_or("unknown"),
106 &mut self.ids,
107 ) {
108 did_change = true;
109 }
110 if add_if_unique(
111 parse_plug_name(&full_topic_string).unwrap_or("unknown"),
112 &mut self.plugs,
113 ) {
114 did_change = true;
115 }
116
117 if did_change {
118 self.trees = self
119 .roles()
120 .iter()
121 .map(|role| AgentTree::new(role.as_str(), self.topics()))
122 .collect::<Vec<AgentTree>>();
123 }
124
125 did_change
126 }
127
128 pub fn topics(&self) -> &[String] {
129 &self.topics
130 }
131
132 pub fn roles(&self) -> &[String] {
133 &self.roles
134 }
135 pub fn ids(&self) -> &[String] {
136 &self.ids
137 }
138 pub fn plugs(&self) -> &[String] {
139 &self.plugs
140 }
141 pub fn trees(&self) -> &[AgentTree] {
142 &self.trees
143 }
144
145 pub fn message_count(&self) -> u128 {
146 self.message_count
147 }
148
149 pub fn message_log(&self) -> &CircularBuffer<MONITOR_LOG_LENGTH, MessageLogEntry> {
150 &self.message_log
151 }
152 pub fn since_log_start(&self) -> Option<Duration> {
153 self.log_start
154 .map(|t| t.elapsed().unwrap_or(Duration::ZERO))
155 }
156
157 pub fn get_rate(&self) -> Option<f32> {
159 match self.log_start {
160 Some(t) => {
161 if let Ok(elapsed) = t.elapsed() {
162 Some(self.message_count as f32 / elapsed.as_secs_f32())
163 } else {
164 None
165 }
166 }
167 None => None,
168 }
169 }
170}
171
172fn add_if_unique(item: &str, list: &mut Vec<String>) -> bool {
173 if !list.iter().any(|i| i.eq(item)) {
174 list.push(String::from(item));
175 true
176 } else {
177 false
178 }
179}