1use std::{thread::spawn, time::SystemTime};
2
3use env_logger::{Builder, Env};
4use log::LevelFilter;
5use tether_agent::TetherAgentOptionsBuilder;
6use tether_utils::{
7 tether_playback::{PlaybackOptions, TetherPlaybackUtil},
8 tether_receive::{receive, ReceiveOptions},
9 tether_record::{RecordOptions, TetherRecordUtil},
10 tether_send::{send, SendOptions},
11 tether_topics::{insights::Insights, TopicOptions},
12};
13
14fn demo_receive() {
15 let mut tether_agent = TetherAgentOptionsBuilder::new("demoReceive")
16 .build()
17 .expect("failed to init/connect Tether Agent");
18
19 let options = ReceiveOptions::default();
20
21 receive(&options, &mut tether_agent, |_plug_name, topic, decoded| {
22 let contents = decoded.unwrap_or("(empty/invalid message)".into());
23 println!("RECEIVE: \"{}\" :: {}", topic, contents);
24 })
25}
26
27fn demo_send() {
28 let mut tether_agent = TetherAgentOptionsBuilder::new("demoSend")
29 .build()
30 .expect("failed to init/connect Tether Agent");
31
32 let mut count = 0;
33
34 let options = SendOptions {
35 plug_name: Some("dummyData".into()),
36 plug_topic: None,
37 plug_id: None,
38 plug_role: None,
39 message_payload_json: None,
40 use_dummy_data: true,
41 };
42
43 loop {
44 std::thread::sleep(std::time::Duration::from_secs(1));
45 count += 1;
46 println!("SEND: sending message #{}", count);
47 send(&options, &mut tether_agent).expect("failed to send");
48 }
49}
50
51fn demo_topics() {
52 let mut tether_agent = TetherAgentOptionsBuilder::new("demoTopics")
53 .build()
54 .expect("failed to init/connect Tether Agent");
55
56 let options = TopicOptions {
57 topic: "#".into(),
58 sampler_interval: 1000,
59 graph_enable: false,
60 };
61
62 let mut insights = Insights::new(&options, &mut tether_agent);
63
64 loop {
65 while let Some((topic, payload)) = tether_agent.check_messages() {
66 if insights.update(&topic, payload) {
67 println!("TOPICS: Insights update: \n{}", insights);
68 }
69 }
70 }
71}
72
73fn demo_playback() {
74 let options = PlaybackOptions {
75 file_path: "./demo.json".into(),
76 override_topic: None,
77 loop_count: 1, loop_infinite: true,
79 ignore_ctrl_c: true, playback_speed: 1.0,
81 topic_filters: None,
82 };
83
84 let tether_agent = TetherAgentOptionsBuilder::new("demoTopics")
85 .build()
86 .expect("failed to init/connect Tether Agent");
87
88 let player = TetherPlaybackUtil::new(options);
89 let stop_request_tx = player.get_stop_tx();
90
91 let start_time = SystemTime::now();
92
93 let handles = vec![
94 spawn(move || {
95 player.start(&tether_agent);
96 }),
97 spawn(move || {
98 let mut time_to_end = false;
99 while !time_to_end {
100 if let Ok(elapsed) = start_time.elapsed() {
101 if elapsed > std::time::Duration::from_secs(3) {
102 println!("Time to stop! {}s elapsed", elapsed.as_secs());
103 stop_request_tx
104 .send(true)
105 .expect("failed to send stop request via channel");
106 time_to_end = true;
107 }
108 }
109 }
110 println!("Playback should have stopped now; wait 1 more seconds...");
111 std::thread::sleep(std::time::Duration::from_secs(1));
112
113 println!("...Bye");
114 }),
115 ];
116 for handle in handles {
117 handle.join().expect("failed to join handle");
118 }
119}
120
121fn demo_record() {
122 let mut tether_agent = TetherAgentOptionsBuilder::new("demoPlayback")
123 .build()
124 .expect("failed to init/connect Tether Agent");
125
126 let options = RecordOptions {
127 file_override_path: None,
128 file_base_path: "./".into(),
129 file_base_name: "recording".into(),
130 file_no_timestamp: false,
131 topic: "#".into(),
132 timing_nonzero_start: false,
133 timing_delay: None,
134 timing_duration: None,
135 ignore_ctrl_c: true, };
137
138 let recorder = TetherRecordUtil::new(options);
139 let stop_request_tx = recorder.get_stop_tx();
140
141 let start_time = SystemTime::now();
142 let handles = vec![
143 spawn(move || {
144 let mut time_to_end = false;
145 while !time_to_end {
146 if let Ok(elapsed) = start_time.elapsed() {
147 if elapsed > std::time::Duration::from_secs(3) {
148 println!("Time to stop! {}s elapsed", elapsed.as_secs());
149 stop_request_tx
150 .send(true)
151 .expect("failed to send stop request via channel");
152 time_to_end = true;
153 }
154 }
155 }
156 println!("Recording should have stopped now; wait 4 more seconds...");
157 std::thread::sleep(std::time::Duration::from_secs(4));
158 println!("...Bye");
159 }),
160 spawn(move || {
161 recorder.start_recording(&mut tether_agent);
162 }),
163 ];
164
165 for handle in handles {
166 handle.join().expect("RECORDER: failed to join handle");
167 }
168}
169
170fn main() {
171 println!(
172 "This example shows how the tether-utils library can be used programmatically,
173 i.e. not from the CLI"
174 );
175 println!("Press Ctrl+C to stop");
176
177 let mut env_builder = Builder::from_env(Env::default().default_filter_or("info"));
178 env_builder.filter_module("paho_mqtt", LevelFilter::Warn);
179 env_builder.init();
180
181 let handles = vec![
182 spawn(demo_receive),
183 spawn(demo_send),
184 spawn(demo_topics),
185 spawn(|| {
186 std::thread::sleep(std::time::Duration::from_secs(4));
187 demo_playback();
188 }),
189 spawn(demo_record),
190 ];
191
192 for handle in handles {
193 handle.join().expect("failed to join handle");
194 }
195}